From 750bb8c0160a0aa3fb0015cddafbec986deadb01 Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Wed, 3 Apr 2024 07:28:41 +0200 Subject: [PATCH 01/10] handle reconnect activate --- .../Stack/Tcp/ChannelAsyncOperation.cs | 5 + .../Stack/Tcp/UaSCBinaryChannel.cs | 14 ++- .../Stack/Tcp/UaSCBinaryClientChannel.cs | 92 +++++++++++-------- .../Stack/Tcp/UaSCBinaryTransportChannel.cs | 6 +- 4 files changed, 70 insertions(+), 47 deletions(-) diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/ChannelAsyncOperation.cs b/Stack/Opc.Ua.Core/Stack/Tcp/ChannelAsyncOperation.cs index b3e6e110b..88fdc69e4 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/ChannelAsyncOperation.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/ChannelAsyncOperation.cs @@ -311,6 +311,11 @@ public async Task EndAsync(int timeout, bool throwOnError = true, Cancellatio } } } + + /// + /// Return the result of the operation. + /// + public ServiceResult Error => m_error ?? ServiceResult.Good; #endregion #region IAsyncResult Members diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs index d60c80382..a4854ca5d 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs @@ -99,7 +99,7 @@ public partial class UaSCUaBinaryChannel : IMessageSink, IDisposable m_discoveryOnly = false; m_uninitialized = true; - m_state = TcpChannelState.Closed; + m_state = (int)TcpChannelState.Closed; m_receiveBufferSize = quotas.MaxBufferSize; m_sendBufferSize = quotas.MaxBufferSize; m_activeWriteRequests = 0; @@ -735,16 +735,14 @@ protected int MaxResponseChunkCount /// protected TcpChannelState State { - get { return m_state; } + get => (TcpChannelState)m_state; set { - if (m_state != value) + if (Interlocked.Exchange(ref m_state, (int)value) != (int)value) { - Utils.LogTrace("ChannelId {0}: in {1} state.", ChannelId, value); + Utils.LogInfo("ChannelId {0}: in {1} state.", ChannelId, value); } - - m_state = value; } } @@ -842,7 +840,7 @@ protected static int CalculateChunkCount(int messageSize, int bufferSize) private int m_maxResponseChunkCount; private string m_contextId; - private TcpChannelState m_state; + private int m_state; private uint m_channelId; private string m_globalChannelId; private long m_sequenceNumber; @@ -858,7 +856,7 @@ protected static int CalculateChunkCount(int messageSize, int bufferSize) /// /// The possible channel states. /// - public enum TcpChannelState + public enum TcpChannelState : int { /// /// The channel is closed. diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs index 59a746d90..ec0321a03 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs @@ -219,25 +219,9 @@ public async Task CloseAsync(int timeout, CancellationToken ct = default) { try { - _ = await operation.EndAsync(timeout, true, ct).ConfigureAwait(false); - } - catch (ServiceResultException e) - { - switch (e.StatusCode) - { - case StatusCodes.BadRequestInterrupted: - case StatusCodes.BadSecureChannelClosed: - { - break; - } - - default: - { - Utils.LogWarning(e, "ChannelId {0}: Could not gracefully close the channel. Reason={1}", ChannelId, e.Result.StatusCode); - break; - } + _ = await operation.EndAsync(timeout, false, ct).ConfigureAwait(false); + ValidateChannelCloseError(operation.Error); } - } catch (Exception e) { Utils.LogError(e, "ChannelId {0}: Could not gracefully close the channel.", ChannelId); @@ -261,23 +245,7 @@ public void Close(int timeout) try { operation.End(timeout, false); - } - catch (ServiceResultException e) - { - switch (e.StatusCode) - { - case StatusCodes.BadRequestInterrupted: - case StatusCodes.BadSecureChannelClosed: - { - break; - } - - default: - { - Utils.LogWarning(e, "ChannelId {0}: Could not gracefully close the channel. Reason={1}", ChannelId, e.Result.StatusCode); - break; - } - } + ValidateChannelCloseError(operation.Error); } catch (Exception e) { @@ -323,9 +291,10 @@ public IAsyncResult BeginSendRequest(IServiceRequest request, int timeout, Async if (m_queuedOperations != null) { operation = BeginOperation(timeout, callback, state); - m_queuedOperations.Add(new QueuedOperation(operation, timeout, request)); - if (firstCall) + bool validConnectOperation = QueueConnectOperation(operation, timeout, request); + + if (firstCall && validConnectOperation) { BeginConnect(m_url, timeout, OnConnectOnDemandComplete, null); } @@ -811,6 +780,55 @@ protected override bool HandleIncomingMessage(uint messageType, ArraySegment + /// Validates the result of a channel close operation. + /// + private void ValidateChannelCloseError(ServiceResult error) + { + if (ServiceResult.IsBad(error)) + { + StatusCode statusCode = error.StatusCode; + switch ((uint)statusCode) + { + case StatusCodes.BadRequestInterrupted: + case StatusCodes.BadSecureChannelClosed: + { + break; + } + + default: + { + Utils.LogWarning("ChannelId {0}: Could not gracefully close the channel. Reason={1}", ChannelId, error); + break; + } + } + } + } + + /// + /// Queues an operation for sending after the channel is connected. + /// Inserts operations that create or activate a session or don't require a session first. + /// + /// true if a valid service call for BeginConnect is queued. + private bool QueueConnectOperation(WriteOperation operation, int timeout, IServiceRequest request) + { + var queuedOperation = new QueuedOperation(operation, timeout, request); + + // operations that must be sent first and which allow for a connect. + if (request.TypeId == DataTypeIds.ActivateSessionRequest || + request.TypeId == DataTypeIds.CreateSessionRequest || + request.TypeId == DataTypeIds.GetEndpointsRequest || + request.TypeId == DataTypeIds.FindServersOnNetworkRequest || + request.TypeId == DataTypeIds.FindServersRequest) + { + m_queuedOperations.Insert(0, queuedOperation); + return true; + } + + m_queuedOperations.Add(queuedOperation); + return false; + } + /// /// Called when the socket is connected. /// diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryTransportChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryTransportChannel.cs index c4357525d..0866a9145 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryTransportChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryTransportChannel.cs @@ -359,9 +359,11 @@ public IAsyncResult BeginSendRequest(IServiceRequest request, AsyncCallback call if (channel == null) { channel = CreateChannel(); - if (Interlocked.CompareExchange(ref m_channel, channel, null) != null) + var currentChannel = Interlocked.CompareExchange(ref m_channel, channel, null); + if (currentChannel != null) { - channel = m_channel; + Utils.SilentDispose(channel); + channel = currentChannel; } } From 9f725015a4b999bf6978ce7aae07d392afbfe580 Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Wed, 3 Apr 2024 07:41:59 +0200 Subject: [PATCH 02/10] return BadSecureChannelClosed instead of ObjectDisposed --- Stack/Opc.Ua.Core/Stack/Client/ClientBase.cs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/Stack/Opc.Ua.Core/Stack/Client/ClientBase.cs b/Stack/Opc.Ua.Core/Stack/Client/ClientBase.cs index f5d033e86..61e5df64f 100644 --- a/Stack/Opc.Ua.Core/Stack/Client/ClientBase.cs +++ b/Stack/Opc.Ua.Core/Stack/Client/ClientBase.cs @@ -93,7 +93,7 @@ public ITransportChannel NullableTransportChannel { if (m_disposed) { - throw new ObjectDisposedException("ClientBase has been disposed."); + throw new ServiceResultException(StatusCodes.BadSecureChannelClosed, "Channel has been closed."); } } @@ -110,17 +110,13 @@ public ITransportChannel TransportChannel if (channel != null) { - if (m_disposed) + if (!m_disposed) { - throw new ObjectDisposedException("ClientBase has been disposed."); + return channel; } } - else - { - throw new ServiceResultException(StatusCodes.BadSecureChannelClosed, "Channel has been closed."); - } - return channel; + throw new ServiceResultException(StatusCodes.BadSecureChannelClosed, "Channel has been closed."); } protected set From e07d3f97b6fe097a2ce00e6213ab942b2eff37a9 Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Mon, 13 May 2024 08:04:56 +0200 Subject: [PATCH 03/10] fix states --- Libraries/Opc.Ua.Client/Session.cs | 78 +++++++++++++------ Libraries/Opc.Ua.Client/SessionAsync.cs | 8 +- .../Opc.Ua.Client/SessionReconnectHandler.cs | 2 +- .../Stack/Tcp/UaSCBinaryChannel.cs | 12 ++- .../Stack/Tcp/UaSCBinaryClientChannel.cs | 46 +++++++++-- 5 files changed, 111 insertions(+), 35 deletions(-) diff --git a/Libraries/Opc.Ua.Client/Session.cs b/Libraries/Opc.Ua.Client/Session.cs index c30d5c935..9de6e915b 100644 --- a/Libraries/Opc.Ua.Client/Session.cs +++ b/Libraries/Opc.Ua.Client/Session.cs @@ -27,6 +27,10 @@ * http://opcfoundation.org/License/MIT/1.00/ * ======================================================================*/ +#if NET6_0_OR_GREATER +#define PERIODIC_TIMER +#endif + using System; using System.Collections.Generic; using System.Diagnostics; @@ -3709,7 +3713,7 @@ private void StartKeepAliveTimer() { StopKeepAliveTimer(); -#if NET6_0_OR_GREATER +#if PERIODIC_TIMER // start periodic timer loop var keepAliveTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(keepAliveInterval)); _ = Task.Run(() => OnKeepAliveAsync(keepAliveTimer, nodesToRead)); @@ -3821,7 +3825,7 @@ private void AsyncRequestCompleted(IAsyncResult result, uint requestId, uint typ } } -#if NET6_0_OR_GREATER +#if PERIODIC_TIMER /// /// Sends a keep alive by reading from the server. /// @@ -3893,6 +3897,11 @@ private void OnSendKeepAlive(ReadValueIdCollection nodesToRead) AsyncRequestStarted(result, requestHeader.RequestHandle, DataTypes.ReadRequest); } + catch (ServiceResultException sre) when (sre.StatusCode == StatusCodes.BadNotConnected) + { + // recover from error condition when secure channel is still alive + OnKeepAliveError(sre.Result); + } catch (Exception e) { Utils.LogError("Could not send keep alive request: {0} {1}", e.GetType().FullName, e.Message); @@ -3935,10 +3944,10 @@ private void OnKeepAliveComplete(IAsyncResult result) return; } - catch (ServiceResultException sre) when (sre.StatusCode == StatusCodes.BadSessionIdInvalid) + catch (ServiceResultException sre) { // recover from error condition when secure channel is still alive - OnKeepAliveError(ServiceResult.Create(StatusCodes.BadSessionIdInvalid, "Session unavailable for keep alive requests.")); + OnKeepAliveError(sre.Result); } catch (Exception e) { @@ -4003,14 +4012,24 @@ protected virtual void OnKeepAlive(ServerState currentState, DateTime currentTim /// protected virtual bool OnKeepAliveError(ServiceResult result) { - long delta = DateTime.UtcNow.Ticks - Interlocked.Read(ref m_lastKeepAliveTime); - + long ticks = DateTime.UtcNow.Ticks; + if (result.StatusCode == StatusCodes.BadNoCommunication) + { + // keep alive read timed out + long delta = ticks - Interlocked.Read(ref m_lastKeepAliveTime); Utils.LogInfo( "KEEP ALIVE LATE: {0}s, EndpointUrl={1}, RequestCount={2}/{3}", ((double)delta) / TimeSpan.TicksPerSecond, this.Endpoint?.EndpointUrl, this.GoodPublishRequestCount, this.OutstandingRequestCount); + } + else + { + // another error triggered the error, ensure that KeepAliveStopped is true + Interlocked.Exchange(ref m_lastKeepAliveTime, + ticks - ((m_keepAliveInterval + kKeepAliveGuardBand) * TimeSpan.TicksPerMillisecond)); + } KeepAliveEventHandler callback = m_KeepAlive; @@ -5074,14 +5093,18 @@ private void OnPublishComplete(IAsyncResult result) case StatusCodes.BadNoSubscription: case StatusCodes.BadSessionClosed: - case StatusCodes.BadSessionIdInvalid: - case StatusCodes.BadSecureChannelIdInvalid: - case StatusCodes.BadSecureChannelClosed: case StatusCodes.BadSecurityChecksFailed: case StatusCodes.BadCertificateInvalid: case StatusCodes.BadServerHalted: return; + // may require a reconnect or activate to recover + case StatusCodes.BadSessionIdInvalid: + case StatusCodes.BadSecureChannelIdInvalid: + case StatusCodes.BadSecureChannelClosed: + OnKeepAliveError(error); + return; + // Servers may return this error when overloaded case StatusCodes.BadTooManyOperations: case StatusCodes.BadTcpServerTooBusy: @@ -5089,10 +5112,13 @@ private void OnPublishComplete(IAsyncResult result) // throttle the next publish to reduce server load _ = Task.Run(async () => { await Task.Delay(100).ConfigureAwait(false); - BeginPublish(OperationTimeout); + QueueBeginPublish(); }); return; + case StatusCodes.BadTimeout: + break; + default: Utils.LogError(e, "PUBLISH #{0} - Unhandled error {1} during Publish.", requestHeader.RequestHandle, error.StatusCode); goto case StatusCodes.BadServerTooBusy; @@ -5100,17 +5126,7 @@ private void OnPublishComplete(IAsyncResult result) } } - int requestCount = GoodPublishRequestCount; - int minPublishRequestCount = GetMinPublishRequestCount(false); - - if (requestCount < minPublishRequestCount) - { - BeginPublish(OperationTimeout); - } - else - { - Utils.LogInfo("PUBLISH - Did not send another publish request. GoodPublishRequestCount={0}, MinPublishRequestCount={1}", requestCount, minPublishRequestCount); - } + QueueBeginPublish(); } /// @@ -5201,6 +5217,24 @@ public bool ResendData(IEnumerable subscriptions, out IList + /// Queues a publish request if there are not enough outstanding requests. + /// + private void QueueBeginPublish() + { + int requestCount = GoodPublishRequestCount; + int minPublishRequestCount = GetMinPublishRequestCount(false); + + if (requestCount < minPublishRequestCount) + { + BeginPublish(OperationTimeout); + } + else + { + Utils.LogInfo("PUBLISH - Did not send another publish request. GoodPublishRequestCount={0}, MinPublishRequestCount={1}", requestCount, minPublishRequestCount); + } + } + /// /// Validates the identity for an open call. /// @@ -6317,7 +6351,7 @@ private static void UpdateLatestSequenceNumberToSend(ref uint latestSequenceNumb private long m_lastKeepAliveTime; private ServerState m_serverState; private int m_keepAliveInterval; -#if NET6_0_OR_GREATER +#if PERIODIC_TIMER private PeriodicTimer m_keepAliveTimer; #else private Timer m_keepAliveTimer; diff --git a/Libraries/Opc.Ua.Client/SessionAsync.cs b/Libraries/Opc.Ua.Client/SessionAsync.cs index f4eb6def5..b57f2f6ea 100644 --- a/Libraries/Opc.Ua.Client/SessionAsync.cs +++ b/Libraries/Opc.Ua.Client/SessionAsync.cs @@ -1332,6 +1332,11 @@ public static async Task RecreateAsync(Session sessionTemplate, ITransp /// The new session object. public static async Task RecreateAsync(Session sessionTemplate, ITransportChannel transportChannel, CancellationToken ct = default) { + if (transportChannel == null) + { + return await Session.RecreateAsync(sessionTemplate, ct).ConfigureAwait(false); + } + ServiceMessageContext messageContext = sessionTemplate.m_configuration.CreateMessageContext(); messageContext.Factory = sessionTemplate.Factory; @@ -1507,7 +1512,8 @@ private async Task ReconnectAsync(ITransportWaitingConnection connection, ITrans } catch (ServiceResultException) { - Utils.LogWarning("WARNING: ACTIVATE SESSION {0} timed out. {1}/{2}", SessionId, GoodPublishRequestCount, OutstandingRequestCount); + Utils.LogWarning("WARNING: ACTIVATE SESSION ASYNC {0} timed out. {1}/{2}", SessionId, GoodPublishRequestCount, OutstandingRequestCount); + throw; } // reactivate session. diff --git a/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs b/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs index 9ec0ac072..c6cc8cb73 100644 --- a/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs +++ b/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs @@ -315,8 +315,8 @@ private async void OnReconnectAsync(object state) keepaliveRecovered = true; // breaking change, the callback must only assign the new // session if the property is != null - m_session = null; Utils.LogInfo("Reconnect {0} aborted, KeepAlive recovered.", m_session?.SessionId); + m_session = null; } else { diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs index 4b5428962..ccd5b9845 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs @@ -447,10 +447,15 @@ protected virtual void OnWriteComplete(object sender, IMessageSocketAsyncEventAr protected void BeginWriteMessage(ArraySegment buffer, object state) { ServiceResult error = ServiceResult.Good; - IMessageSocketAsyncEventArgs args = null; + IMessageSocketAsyncEventArgs args = m_socket?.MessageSocketEventArgs(); + + if (args == null) + { + throw ServiceResultException.Create(StatusCodes.BadConnectionClosed, "The socket was closed by the remote application."); + } + try { - args = m_socket.MessageSocketEventArgs(); Interlocked.Increment(ref m_activeWriteRequests); args.SetBuffer(buffer.Array, buffer.Offset, buffer.Count); args.Completed += OnWriteComplete; @@ -743,7 +748,7 @@ protected TcpChannelState State { if (Interlocked.Exchange(ref m_state, (int)value) != (int)value) { - Utils.LogInfo("ChannelId {0}: in {1} state.", ChannelId, value); + Utils.LogTrace("ChannelId {0}: in {1} state.", ChannelId, value); } } } @@ -872,6 +877,7 @@ public void UpdateLastActiveTime() private int m_maxResponseChunkCount; private string m_contextId; + // treat TcpChannelState as int to use Interlocked private int m_state; private uint m_channelId; private string m_globalChannelId; diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs index 5d5b84bd6..a77a832a9 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs @@ -144,6 +144,10 @@ public IAsyncResult BeginConnect(Uri url, int timeout, AsyncCallback callback, o m_handshakeOperation = operation; State = TcpChannelState.Connecting; + + // set the state. + ChannelStateChanged(TcpChannelState.Connecting, ServiceResult.Good); + if (ReverseSocket) { if (Socket != null) @@ -221,7 +225,7 @@ public async Task CloseAsync(int timeout, CancellationToken ct = default) { _ = await operation.EndAsync(timeout, false, ct).ConfigureAwait(false); ValidateChannelCloseError(operation.Error); - } + } catch (Exception e) { Utils.LogError(e, "ChannelId {0}: Could not gracefully close the channel.", ChannelId); @@ -485,6 +489,9 @@ private bool ProcessAcknowledgeMessage(ArraySegment messageChunk) // ready to open the channel. State = TcpChannelState.Opening; + // set the state. + ChannelStateChanged(TcpChannelState.Opening, ServiceResult.Good); + try { // check if reconnecting after a socket failure. @@ -668,6 +675,9 @@ private bool ProcessOpenSecureChannelResponse(uint messageType, ArraySegment @@ -814,7 +824,17 @@ private bool QueueConnectOperation(WriteOperation operation, int timeout, IServi return true; } - m_queuedOperations.Add(queuedOperation); + // fail until a valid service call for BeginConnect is queued. + if (m_queuedOperations.Count == 0) + { + operation.Fault(StatusCodes.BadSecureChannelClosed); + throw new ServiceResultException(StatusCodes.BadNotConnected); + } + else + { + m_queuedOperations.Add(queuedOperation); + } + return false; } @@ -825,8 +845,7 @@ private void OnConnectComplete(object sender, IMessageSocketAsyncEventArgs e) { WriteOperation operation = (WriteOperation)e.UserToken; - // dual stack ConnectAsync may call in with null UserToken if - // one connection attempt timed out but the other succeeded + // ConnectAsync may call in with a null UserToken, ignore if (operation == null) { return; @@ -924,6 +943,9 @@ private void OnScheduledHandshake(object state) Socket = null; } + // set the state. + ChannelStateChanged(TcpChannelState.Closed, ServiceResult.Good); + if (!ReverseSocket) { // create an operation. @@ -931,6 +953,10 @@ private void OnScheduledHandshake(object state) State = TcpChannelState.Connecting; Socket = m_socketFactory.Create(this, BufferManager, Quotas.MaxBufferSize); + + // set the state. + ChannelStateChanged(TcpChannelState.Connecting, ServiceResult.Good); + Socket.BeginConnect(m_via, m_ConnectCallback, m_handshakeOperation); } } @@ -1080,7 +1106,7 @@ private void Shutdown(ServiceResult reason) } // halt any existing handshake. - if (m_handshakeOperation != null && !m_handshakeOperation.IsCompleted) + if (m_handshakeOperation?.IsCompleted == false) { m_handshakeOperation.Fault(reason); } @@ -1305,14 +1331,15 @@ private void OnConnectOnDemandComplete(object state) } catch (Exception e) { - request.Operation.Fault(e, StatusCodes.BadNoCommunication, "Error establishing a connection: " + e.Message); - break; + request.Operation.Fault(StatusCodes.BadNoCommunication, "Error establishing a connection: " + e.Message); + continue; } } if (this.CurrentToken == null) { request.Operation.Fault(StatusCodes.BadConnectionClosed, "Could not send request because connection is closed."); + continue; } try @@ -1354,6 +1381,9 @@ private WriteOperation InternalClose(int timeout) State = TcpChannelState.Closing; operation = BeginOperation(timeout, null, null); SendCloseSecureChannelRequest(operation); + + // set the state. + ChannelStateChanged(TcpChannelState.Closing, ServiceResult.Good); } } From bf336d08668f6c242c14bb936066daa93ad1b104 Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Mon, 13 May 2024 08:12:03 +0200 Subject: [PATCH 04/10] safe operation --- Stack/Opc.Ua.Core/Stack/Tcp/ChannelAsyncOperation.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/ChannelAsyncOperation.cs b/Stack/Opc.Ua.Core/Stack/Tcp/ChannelAsyncOperation.cs index 88fdc69e4..891a80252 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/ChannelAsyncOperation.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/ChannelAsyncOperation.cs @@ -426,19 +426,20 @@ protected virtual bool InternalComplete(bool doNotBlock, object result) } } - if (m_callback != null) + AsyncCallback callback = m_callback; + if (callback != null) { if (doNotBlock) { Task.Run(() => { - m_callback(this); + callback(this); }); } else { try { - m_callback(this); + callback(this); } catch (Exception e) { From f7a7776dcfe0e33d19f7ef226f6812c6597981c1 Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Mon, 13 May 2024 14:32:25 +0200 Subject: [PATCH 05/10] use last statuscode to determin if the timeout should be used --- Libraries/Opc.Ua.Client/Session.cs | 49 ++++++++++++------- Libraries/Opc.Ua.Client/SessionAsync.cs | 11 +++-- .../Opc.Ua.Client/SessionReconnectHandler.cs | 14 ++++-- 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/Libraries/Opc.Ua.Client/Session.cs b/Libraries/Opc.Ua.Client/Session.cs index 9de6e915b..03b18686a 100644 --- a/Libraries/Opc.Ua.Client/Session.cs +++ b/Libraries/Opc.Ua.Client/Session.cs @@ -44,6 +44,7 @@ using System.Threading.Tasks; using System.Xml; using Microsoft.Extensions.Logging; +using Opc.Ua.Bindings; namespace Opc.Ua.Client { @@ -745,17 +746,25 @@ public int KeepAliveInterval /// Returns true if the session is not receiving keep alives. /// /// - /// Set to true if the server does not respond for 2 times the KeepAliveInterval. - /// Set to false is communication recovers. + /// Set to true if the server does not respond for 2 times the KeepAliveInterval + /// or if another error was reported. + /// Set to false is communication is ok or recovered. /// public bool KeepAliveStopped { get { - TimeSpan delta = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - Interlocked.Read(ref m_lastKeepAliveTime)); + StatusCode lastKeepAliveErrorStatusCode = m_lastKeepAliveErrorStatusCode; + if (StatusCode.IsGood(lastKeepAliveErrorStatusCode) || lastKeepAliveErrorStatusCode == StatusCodes.BadNoCommunication) + { + TimeSpan delta = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - Interlocked.Read(ref m_lastKeepAliveTime)); + + // add a guard band to allow for network lag. + return (m_keepAliveInterval + kKeepAliveGuardBand) <= delta.TotalMilliseconds; + } - // add a guard band to allow for network lag. - return (m_keepAliveInterval + kKeepAliveGuardBand) <= delta.TotalMilliseconds; + // another error was reported which caused keep alive to stop. + return true; } } @@ -1431,9 +1440,13 @@ private void Reconnect(ITransportWaitingConnection connection, ITransportChannel connection, transportChannel); + if (!(result is ChannelAsyncOperation operation)) throw new ArgumentNullException(nameof(result)); + if (!result.AsyncWaitHandle.WaitOne(kReconnectTimeout / 2)) { - Utils.LogWarning("WARNING: ACTIVATE SESSION timed out. {0}/{1}", GoodPublishRequestCount, OutstandingRequestCount); + var error = ServiceResult.Create(StatusCodes.BadRequestTimeout, "ACTIVATE SESSION timed out. {0}/{1}", GoodPublishRequestCount, OutstandingRequestCount); + Utils.LogWarning("WARNING: {0}", error.ToString()); + operation.Fault(false, error); } // reactivate session. @@ -3694,6 +3707,7 @@ private void StartKeepAliveTimer() { int keepAliveInterval = m_keepAliveInterval; + m_lastKeepAliveErrorStatusCode = StatusCodes.Good; Interlocked.Exchange(ref m_lastKeepAliveTime, DateTime.UtcNow.Ticks); m_serverState = ServerState.Unknown; @@ -3969,6 +3983,7 @@ protected virtual void OnKeepAlive(ServerState currentState, DateTime currentTim return; } + m_lastKeepAliveErrorStatusCode = StatusCodes.Good; Interlocked.Exchange(ref m_lastKeepAliveTime, DateTime.UtcNow.Ticks); lock (m_outstandingRequests) @@ -3986,6 +4001,7 @@ protected virtual void OnKeepAlive(ServerState currentState, DateTime currentTim } else { + m_lastKeepAliveErrorStatusCode = StatusCodes.Good; Interlocked.Exchange(ref m_lastKeepAliveTime, DateTime.UtcNow.Ticks); } @@ -4013,22 +4029,18 @@ protected virtual void OnKeepAlive(ServerState currentState, DateTime currentTim protected virtual bool OnKeepAliveError(ServiceResult result) { long ticks = DateTime.UtcNow.Ticks; + + m_lastKeepAliveErrorStatusCode = result.StatusCode; if (result.StatusCode == StatusCodes.BadNoCommunication) { // keep alive read timed out long delta = ticks - Interlocked.Read(ref m_lastKeepAliveTime); - Utils.LogInfo( - "KEEP ALIVE LATE: {0}s, EndpointUrl={1}, RequestCount={2}/{3}", - ((double)delta) / TimeSpan.TicksPerSecond, - this.Endpoint?.EndpointUrl, - this.GoodPublishRequestCount, - this.OutstandingRequestCount); - } - else - { - // another error triggered the error, ensure that KeepAliveStopped is true - Interlocked.Exchange(ref m_lastKeepAliveTime, - ticks - ((m_keepAliveInterval + kKeepAliveGuardBand) * TimeSpan.TicksPerMillisecond)); + Utils.LogInfo( + "KEEP ALIVE LATE: {0}s, EndpointUrl={1}, RequestCount={2}/{3}", + ((double)delta) / TimeSpan.TicksPerSecond, + this.Endpoint?.EndpointUrl, + this.GoodPublishRequestCount, + this.OutstandingRequestCount); } KeepAliveEventHandler callback = m_KeepAlive; @@ -6349,6 +6361,7 @@ private static void UpdateLatestSequenceNumberToSend(ref uint latestSequenceNumb private long m_publishCounter; private int m_tooManyPublishRequests; private long m_lastKeepAliveTime; + private StatusCode m_lastKeepAliveErrorStatusCode; private ServerState m_serverState; private int m_keepAliveInterval; #if PERIODIC_TIMER diff --git a/Libraries/Opc.Ua.Client/SessionAsync.cs b/Libraries/Opc.Ua.Client/SessionAsync.cs index b57f2f6ea..cccbf57aa 100644 --- a/Libraries/Opc.Ua.Client/SessionAsync.cs +++ b/Libraries/Opc.Ua.Client/SessionAsync.cs @@ -1510,10 +1510,15 @@ private async Task ReconnectAsync(ITransportWaitingConnection connection, ITrans { _ = await operation.EndAsync(kReconnectTimeout / 2, true, ct).ConfigureAwait(false); } - catch (ServiceResultException) + catch (ServiceResultException sre) { - Utils.LogWarning("WARNING: ACTIVATE SESSION ASYNC {0} timed out. {1}/{2}", SessionId, GoodPublishRequestCount, OutstandingRequestCount); - throw; + if (sre.StatusCode == StatusCodes.BadRequestInterrupted) + { + var error = ServiceResult.Create(StatusCodes.BadRequestTimeout, "ACTIVATE SESSION timed out. {0}/{1}", + GoodPublishRequestCount, OutstandingRequestCount); + Utils.LogWarning("WARNING: {0}", error.ToString()); + operation.Fault(false, error); + } } // reactivate session. diff --git a/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs b/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs index c6cc8cb73..eb2e9d4ca 100644 --- a/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs +++ b/Libraries/Opc.Ua.Client/SessionReconnectHandler.cs @@ -375,6 +375,7 @@ private async Task DoReconnectAsync() // helper to override operation timeout int operationTimeout = m_session.OperationTimeout; int reconnectOperationTimeout = Math.Max(m_reconnectPeriod, MinReconnectOperationTimeout); + ITransportChannel transportChannel = null; // try a reconnect. if (!m_reconnectFailed) @@ -429,10 +430,15 @@ private async Task DoReconnectAsync() m_updateFromServer = true; Utils.LogInfo("Reconnect failed due to security check. Request endpoint update from server. {0}", sre.Message); } - // wait for next scheduled reconnect if connection failed, - // otherwise recreate session immediately - else if (sre.StatusCode != StatusCodes.BadSessionIdInvalid) + // recreate session immediately, use existing channel + else if (sre.StatusCode == StatusCodes.BadSessionIdInvalid) + { + transportChannel = m_session.NullableTransportChannel; + m_session.DetachChannel(); + } + else { + // wait for next scheduled reconnect if connection failed, // next attempt is to recreate session m_reconnectFailed = true; return false; @@ -492,7 +498,7 @@ private async Task DoReconnectAsync() m_updateFromServer = false; } - session = await m_session.SessionFactory.RecreateAsync(m_session).ConfigureAwait(false); + session = await m_session.SessionFactory.RecreateAsync(m_session, transportChannel).ConfigureAwait(false); } // note: the template session is not connected at this point // and must be disposed by the owner From 26a3c8608d68696a2fa327fca2f0b7f483327c0b Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Tue, 14 May 2024 11:30:29 +0200 Subject: [PATCH 06/10] disable warning --- Stack/Opc.Ua.Bindings.Https/Opc.Ua.Bindings.Https.csproj | 1 + .../Opc.Ua.Bindings.Https/Stack/Https/HttpsTransportChannel.cs | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Stack/Opc.Ua.Bindings.Https/Opc.Ua.Bindings.Https.csproj b/Stack/Opc.Ua.Bindings.Https/Opc.Ua.Bindings.Https.csproj index 02b621cfa..73b3dc781 100644 --- a/Stack/Opc.Ua.Bindings.Https/Opc.Ua.Bindings.Https.csproj +++ b/Stack/Opc.Ua.Bindings.Https/Opc.Ua.Bindings.Https.csproj @@ -1,6 +1,7 @@ + true $(HttpsTargetFrameworks) Opc.Ua.Bindings.Https OPCFoundation.NetStandard.Opc.Ua.Bindings.Https diff --git a/Stack/Opc.Ua.Bindings.Https/Stack/Https/HttpsTransportChannel.cs b/Stack/Opc.Ua.Bindings.Https/Stack/Https/HttpsTransportChannel.cs index f7061f1d2..21601552d 100644 --- a/Stack/Opc.Ua.Bindings.Https/Stack/Https/HttpsTransportChannel.cs +++ b/Stack/Opc.Ua.Bindings.Https/Stack/Https/HttpsTransportChannel.cs @@ -17,9 +17,8 @@ using System.Net.Http; using System.Net.Http.Headers; using System.Net.Security; -using System.Runtime.InteropServices; -using System.Security.Cryptography.X509Certificates; using System.Reflection; +using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; From ded660fb291b16918803f21aaa5e8f945c28f105 Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Tue, 14 May 2024 11:49:22 +0200 Subject: [PATCH 07/10] fix tests --- Libraries/Opc.Ua.Client/Session.cs | 5 +- Libraries/Opc.Ua.Client/SessionAsync.cs | 30 +++-- Tests/Opc.Ua.Client.Tests/ClientTest.cs | 26 ++-- .../ClientTestFramework.cs | 7 +- Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs | 123 +++++++++--------- .../Properties/launchSettings.json | 8 ++ Tests/Opc.Ua.Server.Tests/ServerFixture.cs | 9 +- 7 files changed, 119 insertions(+), 89 deletions(-) create mode 100644 Tests/Opc.Ua.Core.Tests/Properties/launchSettings.json diff --git a/Libraries/Opc.Ua.Client/Session.cs b/Libraries/Opc.Ua.Client/Session.cs index 03b18686a..9bf81470c 100644 --- a/Libraries/Opc.Ua.Client/Session.cs +++ b/Libraries/Opc.Ua.Client/Session.cs @@ -1440,13 +1440,12 @@ private void Reconnect(ITransportWaitingConnection connection, ITransportChannel connection, transportChannel); - if (!(result is ChannelAsyncOperation operation)) throw new ArgumentNullException(nameof(result)); - if (!result.AsyncWaitHandle.WaitOne(kReconnectTimeout / 2)) { var error = ServiceResult.Create(StatusCodes.BadRequestTimeout, "ACTIVATE SESSION timed out. {0}/{1}", GoodPublishRequestCount, OutstandingRequestCount); Utils.LogWarning("WARNING: {0}", error.ToString()); - operation.Fault(false, error); + var operation = result as ChannelAsyncOperation; + operation?.Fault(false, error); } // reactivate session. diff --git a/Libraries/Opc.Ua.Client/SessionAsync.cs b/Libraries/Opc.Ua.Client/SessionAsync.cs index cccbf57aa..58bb21e34 100644 --- a/Libraries/Opc.Ua.Client/SessionAsync.cs +++ b/Libraries/Opc.Ua.Client/SessionAsync.cs @@ -1504,21 +1504,27 @@ private async Task ReconnectAsync(ITransportWaitingConnection connection, ITrans connection, transportChannel); - if (!(result is ChannelAsyncOperation operation)) throw new ArgumentNullException(nameof(result)); - - try - { - _ = await operation.EndAsync(kReconnectTimeout / 2, true, ct).ConfigureAwait(false); - } - catch (ServiceResultException sre) + const string timeoutMessage = "ACTIVATE SESSION ASYNC timed out. {0}/{1}"; + if (result is ChannelAsyncOperation operation) { - if (sre.StatusCode == StatusCodes.BadRequestInterrupted) + try { - var error = ServiceResult.Create(StatusCodes.BadRequestTimeout, "ACTIVATE SESSION timed out. {0}/{1}", - GoodPublishRequestCount, OutstandingRequestCount); - Utils.LogWarning("WARNING: {0}", error.ToString()); - operation.Fault(false, error); + _ = await operation.EndAsync(kReconnectTimeout / 2, true, ct).ConfigureAwait(false); } + catch (ServiceResultException sre) + { + if (sre.StatusCode == StatusCodes.BadRequestInterrupted) + { + var error = ServiceResult.Create(StatusCodes.BadRequestTimeout, timeoutMessage, + GoodPublishRequestCount, OutstandingRequestCount); + Utils.LogWarning("WARNING: {0}", error.ToString()); + operation.Fault(false, error); + } + } + } + else if (!result.AsyncWaitHandle.WaitOne(kReconnectTimeout / 2)) + { + Utils.LogWarning(timeoutMessage, GoodPublishRequestCount, OutstandingRequestCount); } // reactivate session. diff --git a/Tests/Opc.Ua.Client.Tests/ClientTest.cs b/Tests/Opc.Ua.Client.Tests/ClientTest.cs index b1247370a..f92db031c 100644 --- a/Tests/Opc.Ua.Client.Tests/ClientTest.cs +++ b/Tests/Opc.Ua.Client.Tests/ClientTest.cs @@ -682,12 +682,12 @@ public async Task ReconnectSessionOnAlternateChannel(bool closeChannel) // cannot read using a closed channel, validate the status code sre = Assert.Throws(() => session1.ReadValue(VariableIds.Server_ServerStatus, typeof(ServerStatusDataType))); - // TODO: Both channel should return BadSecureChannelClosed + // TODO: Both channel should return BadNotConnected if (!(StatusCodes.BadSecureChannelClosed == sre.StatusCode)) { if (endpoint.EndpointUrl.ToString().StartsWith(Utils.UriSchemeOpcTcp, StringComparison.Ordinal)) { - Assert.AreEqual((StatusCode)StatusCodes.BadSessionIdInvalid, (StatusCode)sre.StatusCode, sre.Message); + Assert.AreEqual((StatusCode)StatusCodes.BadNotConnected, (StatusCode)sre.StatusCode, sre.Message); } else { @@ -701,11 +701,13 @@ public async Task ReconnectSessionOnAlternateChannel(bool closeChannel) /// the same session on a new channel with saved session secrets /// [Test, Order(260)] - [TestCase(SecurityPolicies.None, true)] - [TestCase(SecurityPolicies.None, false)] - [TestCase(SecurityPolicies.Basic256Sha256, true)] - [TestCase(SecurityPolicies.Basic256Sha256, false)] - public async Task ReconnectSessionOnAlternateChannelWithSavedSessionSecrets(string securityPolicy, bool anonymous) + [TestCase(SecurityPolicies.None, true, false)] + [TestCase(SecurityPolicies.None, false, false)] + [TestCase(SecurityPolicies.None, false, true)] + [TestCase(SecurityPolicies.Basic256Sha256, true, false)] + [TestCase(SecurityPolicies.Basic256Sha256, false, false)] + [TestCase(SecurityPolicies.Basic256Sha256, false, true)] + public async Task ReconnectSessionOnAlternateChannelWithSavedSessionSecrets(string securityPolicy, bool anonymous, bool asyncReconnect) { ServiceResultException sre; @@ -756,8 +758,14 @@ public async Task ReconnectSessionOnAlternateChannelWithSavedSessionSecrets(stri }; // activate the session from saved session secrets on the new channel - session2.Reconnect(channel2); - + if (asyncReconnect) + { + await session2.ReconnectAsync(channel2, CancellationToken.None).ConfigureAwait(false); + } + else + { + session2.Reconnect(channel2); + } Thread.Sleep(500); Assert.AreEqual(session1.SessionId, session2.SessionId); diff --git a/Tests/Opc.Ua.Client.Tests/ClientTestFramework.cs b/Tests/Opc.Ua.Client.Tests/ClientTestFramework.cs index 8e72573a6..69bb37472 100644 --- a/Tests/Opc.Ua.Client.Tests/ClientTestFramework.cs +++ b/Tests/Opc.Ua.Client.Tests/ClientTestFramework.cs @@ -60,6 +60,7 @@ public class ClientTestFramework public TokenValidatorMock TokenValidator { get; set; } = new TokenValidatorMock(); public bool SingleSession { get; set; } = true; + public int MaxChannelCount { get; set; } = 10; public bool SupportsExternalServerUrl { get; set; } = false; public ServerFixture ServerFixture { get; set; } public ClientFixture ClientFixture { get; set; } @@ -135,13 +136,13 @@ public async Task OneTimeSetUpAsync(TextWriter writer = null, bool securityNone if (customUrl == null) { // start Ref server - ServerFixture = new ServerFixture(enableTracing, disableActivityLogging) - { + ServerFixture = new ServerFixture(enableTracing, disableActivityLogging) { UriScheme = UriScheme, SecurityNone = securityNone, AutoAccept = true, AllNodeManagers = true, - OperationLimits = true + OperationLimits = true, + MaxChannelCount = MaxChannelCount, }; if (writer != null) diff --git a/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs b/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs index b2fe2f28e..cd7e0fc13 100644 --- a/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs +++ b/Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs @@ -62,6 +62,7 @@ public new Task OneTimeSetUp() SupportsExternalServerUrl = true; // create a new session for every test SingleSession = false; + MaxChannelCount = 1000; return base.OneTimeSetUpAsync(null, true); } @@ -503,6 +504,7 @@ public void SequentialPublishingSubscription(bool enabled) // the active channel ISession session1 = await ClientFixture.ConnectAsync(endpoint, userIdentity).ConfigureAwait(false); Assert.NotNull(session1); + var sessionId1 = session1.SessionId; int session1ConfigChanged = 0; session1.SessionConfigurationChanged += (object sender, EventArgs e) => { session1ConfigChanged++; }; @@ -616,80 +618,85 @@ public void SequentialPublishingSubscription(bool enabled) await Task.Delay(2 * kDelay).ConfigureAwait(false); - Assert.AreEqual(session1.SessionId, session2.SessionId); - - if (asyncTest) - { - DataValue value2 = await session2.ReadValueAsync(VariableIds.Server_ServerStatus).ConfigureAwait(false); - Assert.NotNull(value2); - } - else - { - ServerStatusDataType value2 = (ServerStatusDataType)session2.ReadValue(VariableIds.Server_ServerStatus, typeof(ServerStatusDataType)); - Assert.NotNull(value2); - } - - for (ii = 0; ii < kTestSubscriptions; ii++) + try { - var monitoredItemCount = restoredSubscriptions[ii].MonitoredItemCount; - string errorText = $"Error in test subscription {ii}"; + Assert.AreEqual(sessionId1, session2.SessionId); - // the static subscription doesn't resend data until there is a data change - if (ii == 0 && !sendInitialValues) + if (asyncTest) { - Assert.AreEqual(0, targetSubscriptionCounters[ii], errorText); - Assert.AreEqual(0, targetSubscriptionFastDataCounters[ii], errorText); + DataValue value2 = await session2.ReadValueAsync(VariableIds.Server_ServerStatus).ConfigureAwait(false); + Assert.NotNull(value2); } - else if (ii == 0) + else { - Assert.AreEqual(monitoredItemCount, targetSubscriptionCounters[ii], errorText); - Assert.AreEqual(1, targetSubscriptionFastDataCounters[ii], errorText); + ServerStatusDataType value2 = (ServerStatusDataType)session2.ReadValue(VariableIds.Server_ServerStatus, typeof(ServerStatusDataType)); + Assert.NotNull(value2); } - else + + for (ii = 0; ii < kTestSubscriptions; ii++) { - Assert.LessOrEqual(monitoredItemCount, targetSubscriptionCounters[ii], errorText); - Assert.LessOrEqual(1, targetSubscriptionFastDataCounters[ii], errorText); + var monitoredItemCount = restoredSubscriptions[ii].MonitoredItemCount; + string errorText = $"Error in test subscription {ii}"; + + // the static subscription doesn't resend data until there is a data change + if (ii == 0 && !sendInitialValues) + { + Assert.AreEqual(0, targetSubscriptionCounters[ii], errorText); + Assert.AreEqual(0, targetSubscriptionFastDataCounters[ii], errorText); + } + else if (ii == 0) + { + Assert.AreEqual(monitoredItemCount, targetSubscriptionCounters[ii], errorText); + Assert.AreEqual(1, targetSubscriptionFastDataCounters[ii], errorText); + } + else + { + Assert.LessOrEqual(monitoredItemCount, targetSubscriptionCounters[ii], errorText); + Assert.LessOrEqual(1, targetSubscriptionFastDataCounters[ii], errorText); + } } - } - await Task.Delay(kDelay).ConfigureAwait(false); + await Task.Delay(kDelay).ConfigureAwait(false); - // verify that reconnect created subclassed version of subscription and monitored item - foreach (var s in session2.Subscriptions) - { - Assert.AreEqual(typeof(TestableSubscription), s.GetType()); - foreach (var m in s.MonitoredItems) + // verify that reconnect created subclassed version of subscription and monitored item + foreach (var s in session2.Subscriptions) { - Assert.AreEqual(typeof(TestableMonitoredItem), m.GetType()); + Assert.AreEqual(typeof(TestableSubscription), s.GetType()); + foreach (var m in s.MonitoredItems) + { + Assert.AreEqual(typeof(TestableMonitoredItem), m.GetType()); + } } - } - - // cannot read using a closed channel, validate the status code - if (endpoint.EndpointUrl.ToString().StartsWith(Utils.UriSchemeOpcTcp, StringComparison.Ordinal)) - { - sre = Assert.Throws(() => session1.ReadValue(VariableIds.Server_ServerStatus, typeof(ServerStatusDataType))); - Assert.AreEqual((StatusCode)StatusCodes.BadSecureChannelIdInvalid, (StatusCode)sre.StatusCode, sre.Message); - } - else - { - var result = session1.ReadValue(VariableIds.Server_ServerStatus, typeof(ServerStatusDataType)); - Assert.NotNull(result); - } - session1.DeleteSubscriptionsOnClose = true; - session2.DeleteSubscriptionsOnClose = true; - if (asyncTest) - { - await session1.CloseAsync(1000).ConfigureAwait(false); - await session2.CloseAsync(1000).ConfigureAwait(false); + // cannot read using a closed channel, validate the status code + if (endpoint.EndpointUrl.ToString().StartsWith(Utils.UriSchemeOpcTcp, StringComparison.Ordinal)) + { + sre = Assert.Throws(() => session1.ReadValue(VariableIds.Server_ServerStatus, typeof(ServerStatusDataType))); + Assert.AreEqual((StatusCode)StatusCodes.BadSecureChannelClosed, (StatusCode)sre.StatusCode, sre.Message); + } + else + { + var result = session1.ReadValue(VariableIds.Server_ServerStatus, typeof(ServerStatusDataType)); + Assert.NotNull(result); + } } - else + finally { - session1.Close(1000); - session2.Close(1000); + session1.DeleteSubscriptionsOnClose = true; + session2.DeleteSubscriptionsOnClose = true; + if (asyncTest) + { + await session1.CloseAsync(1000, true).ConfigureAwait(false); + await session2.CloseAsync(1000, true).ConfigureAwait(false); + } + else + { + session1.Close(1000, true); + session2.Close(1000, true); + } + Utils.SilentDispose(session1); + Utils.SilentDispose(session2); } - Utils.SilentDispose(session1); - Utils.SilentDispose(session2); Assert.AreEqual(0, session1ConfigChanged); Assert.Less(0, session2ConfigChanged); diff --git a/Tests/Opc.Ua.Core.Tests/Properties/launchSettings.json b/Tests/Opc.Ua.Core.Tests/Properties/launchSettings.json new file mode 100644 index 000000000..6ad8bfb8d --- /dev/null +++ b/Tests/Opc.Ua.Core.Tests/Properties/launchSettings.json @@ -0,0 +1,8 @@ +{ + "profiles": { + "Opc.Ua.Core.Tests": { + "commandName": "Project", + "commandLineArgs": "--runtimes net48 net6.0 net8.0" + } + } +} \ No newline at end of file diff --git a/Tests/Opc.Ua.Server.Tests/ServerFixture.cs b/Tests/Opc.Ua.Server.Tests/ServerFixture.cs index 798679120..7e7fb76bb 100644 --- a/Tests/Opc.Ua.Server.Tests/ServerFixture.cs +++ b/Tests/Opc.Ua.Server.Tests/ServerFixture.cs @@ -48,6 +48,7 @@ public class ServerFixture where T : ServerBase, new() public bool LogConsole { get; set; } public bool AutoAccept { get; set; } public bool OperationLimits { get; set; } + public int MaxChannelCount { get; set; } = 10; public int ReverseConnectTimeout { get; set; } public bool AllNodeManagers { get; set; } public int TraceMasks { get; set; } = Utils.TraceMasks.Error | Utils.TraceMasks.StackTrace | Utils.TraceMasks.Security | Utils.TraceMasks.Information; @@ -129,10 +130,10 @@ public async Task LoadConfiguration(string pkiRoot = null) }); } - serverConfig.SetMaxMessageQueueSize(20); - serverConfig.SetDiagnosticsEnabled(true); - serverConfig.SetAuditingEnabled(true); - serverConfig.SetMaxChannelCount(10); + serverConfig.SetMaxChannelCount(MaxChannelCount) + .SetMaxMessageQueueSize(20) + .SetDiagnosticsEnabled(true) + .SetAuditingEnabled(true); if (ReverseConnectTimeout != 0) { From e03181d64dfe3605885e5efaffc36892b3c6aa07 Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Tue, 14 May 2024 12:05:32 +0200 Subject: [PATCH 08/10] add reconnectasync tests --- Tests/Opc.Ua.Client.Tests/ClientTest.cs | 33 ++++++++++++++++++++----- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/Tests/Opc.Ua.Client.Tests/ClientTest.cs b/Tests/Opc.Ua.Client.Tests/ClientTest.cs index f92db031c..3425f260b 100644 --- a/Tests/Opc.Ua.Client.Tests/ClientTest.cs +++ b/Tests/Opc.Ua.Client.Tests/ClientTest.cs @@ -285,7 +285,7 @@ public void ReadOnDiscoveryChannel(int readCount) // client may report channel closed instead of security policy rejected if (StatusCodes.BadSecureChannelClosed == sre.StatusCode) { - Assert.Inconclusive($"Unexpected Status: {sre}" ); + Assert.Inconclusive($"Unexpected Status: {sre}"); } Assert.AreEqual((StatusCode)StatusCodes.BadSecurityPolicyRejected, (StatusCode)sre.StatusCode, "Unexpected Status: {0}", sre); } @@ -314,7 +314,7 @@ public void GetEndpointsOnDiscoveryChannel() // client may report channel closed instead of security policy rejected if (StatusCodes.BadSecureChannelClosed == sre.StatusCode) { - Assert.Inconclusive($"Unexpected Status: {sre}" ); + Assert.Inconclusive($"Unexpected Status: {sre}"); } Assert.AreEqual((StatusCode)StatusCodes.BadSecurityPolicyRejected, (StatusCode)sre.StatusCode, "Unexpected Status: {0}", sre); } @@ -614,7 +614,7 @@ public async Task ConnectMultipleSessionsAsync() /// Close the first channel before or after the new channel is activated. /// [Theory, Order(250)] - public async Task ReconnectSessionOnAlternateChannel(bool closeChannel) + public async Task ReconnectSessionOnAlternateChannel(bool closeChannel, bool asyncReconnect) { ServiceResultException sre; @@ -649,7 +649,14 @@ public async Task ReconnectSessionOnAlternateChannel(bool closeChannel) Assert.NotNull(channel2); // activate the session on the new channel - session1.Reconnect(channel2); + if (asyncReconnect) + { + await session1.ReconnectAsync(channel2, CancellationToken.None).ConfigureAwait(false); + } + else + { + session1.Reconnect(channel2); + } // test by reading a value ServerStatusDataType value2 = (ServerStatusDataType)session1.ReadValue(VariableIds.Server_ServerStatus, typeof(ServerStatusDataType)); @@ -669,14 +676,28 @@ public async Task ReconnectSessionOnAlternateChannel(bool closeChannel) Assert.AreEqual(value1.State, value3.State); // close the session, keep the channel open - session1.Close(closeChannel: false); + if (asyncReconnect) + { + await session1.CloseAsync(closeChannel: false, CancellationToken.None).ConfigureAwait(false); + } + else + { + session1.Close(closeChannel: false); + } // cannot read using a closed session, validate the status code sre = Assert.Throws(() => session1.ReadValue(VariableIds.Server_ServerStatus, typeof(ServerStatusDataType))); Assert.AreEqual((StatusCode)StatusCodes.BadSessionIdInvalid, (StatusCode)sre.StatusCode, sre.Message); // close the channel - channel2.Close(); + if (asyncReconnect) + { + await channel2.CloseAsync(CancellationToken.None).ConfigureAwait(false); + } + else + { + channel2.Close(); + } channel2.Dispose(); // cannot read using a closed channel, validate the status code From e7780cd4951165392c860fcb24154b43efe27c0c Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Tue, 14 May 2024 12:14:16 +0200 Subject: [PATCH 09/10] remove launchsettings --- Tests/Opc.Ua.Core.Tests/Properties/launchSettings.json | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 Tests/Opc.Ua.Core.Tests/Properties/launchSettings.json diff --git a/Tests/Opc.Ua.Core.Tests/Properties/launchSettings.json b/Tests/Opc.Ua.Core.Tests/Properties/launchSettings.json deleted file mode 100644 index 6ad8bfb8d..000000000 --- a/Tests/Opc.Ua.Core.Tests/Properties/launchSettings.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "profiles": { - "Opc.Ua.Core.Tests": { - "commandName": "Project", - "commandLineArgs": "--runtimes net48 net6.0 net8.0" - } - } -} \ No newline at end of file From e61785b4d9a623ccd0d848b9425bf4b23df37c7b Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Tue, 14 May 2024 16:54:30 +0200 Subject: [PATCH 10/10] code review fixes --- .../ConsoleReferenceClient/ClientSamples.cs | 6 ++- .../ConsoleReferenceClient/Program.cs | 40 +++++++++++++++---- Libraries/Opc.Ua.Client/Session.cs | 6 +-- Tests/Opc.Ua.Client.Tests/ClientTest.cs | 2 - 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/Applications/ConsoleReferenceClient/ClientSamples.cs b/Applications/ConsoleReferenceClient/ClientSamples.cs index 5730c01c8..4291642e5 100644 --- a/Applications/ConsoleReferenceClient/ClientSamples.cs +++ b/Applications/ConsoleReferenceClient/ClientSamples.cs @@ -699,7 +699,7 @@ public void SubscribeToDataChanges(ISession session, uint minLifeTime) /// Outputs elapsed time information for perf testing and lists all /// types that were successfully added to the session encodeable type factory. /// - public async Task LoadTypeSystemAsync(ISession session) + public async Task LoadTypeSystemAsync(ISession session) { m_output.WriteLine("Load the server type system."); @@ -732,6 +732,8 @@ public async Task LoadTypeSystemAsync(ISession session) } } } + + return complexTypeSystem; } #endregion @@ -900,7 +902,7 @@ Task FetchReferenceIdTypesAsync(ISession session) StartNodeId = item.NodeId, AttributeId = Attributes.Value, SamplingInterval = samplingInterval, - DisplayName = item.DisplayName?.Text ?? item.BrowseName.Name, + DisplayName = item.DisplayName?.Text ?? item.BrowseName?.Name ?? "unknown", QueueSize = queueSize, DiscardOldest = true, MonitoringMode = MonitoringMode.Reporting, diff --git a/Applications/ConsoleReferenceClient/Program.cs b/Applications/ConsoleReferenceClient/Program.cs index 34cbccb86..e0b3cde33 100644 --- a/Applications/ConsoleReferenceClient/Program.cs +++ b/Applications/ConsoleReferenceClient/Program.cs @@ -234,7 +234,7 @@ public static async Task Main(string[] args) var samples = new ClientSamples(output, ClientBase.ValidateResponse, quitEvent, verbose); if (loadTypes) { - await samples.LoadTypeSystemAsync(uaClient.Session).ConfigureAwait(false); + var complexTypeSystem = await samples.LoadTypeSystemAsync(uaClient.Session).ConfigureAwait(false); } if (browseall || fetchall || jsonvalues) @@ -266,8 +266,8 @@ public static async Task Main(string[] args) if (subscribe && (browseall || fetchall)) { - // subscribe to 100 random variables - const int MaxVariables = 100; + // subscribe to 1000 random variables + const int MaxVariables = 1000; NodeCollection variables = new NodeCollection(); Random random = new Random(62541); if (fetchall) @@ -291,15 +291,41 @@ public static async Task Main(string[] args) await samples.SubscribeAllValuesAsync(uaClient, variableIds: new NodeCollection(variables), - samplingInterval: 1000, - publishingInterval: 5000, + samplingInterval: 100, + publishingInterval: 1000, queueSize: 10, - lifetimeCount: 12, + lifetimeCount: 60, keepAliveCount: 2).ConfigureAwait(false); // Wait for DataChange notifications from MonitoredItems output.WriteLine("Subscribed to {0} variables. Press Ctrl-C to exit.", MaxVariables); - quit = quitEvent.WaitOne(timeout > 0 ? waitTime : Timeout.Infinite); + + // free unused memory + uaClient.Session.NodeCache.Clear(); + + waitTime = timeout - (int)DateTime.UtcNow.Subtract(start).TotalMilliseconds; + DateTime endTime = waitTime > 0 ? DateTime.UtcNow.Add(TimeSpan.FromMilliseconds(waitTime)) : DateTime.MaxValue; + var variableIterator = variables.GetEnumerator(); + while (!quit && endTime > DateTime.UtcNow) + { + if (variableIterator.MoveNext()) + { + try + { + var value = await uaClient.Session.ReadValueAsync(variableIterator.Current.NodeId).ConfigureAwait(false); + output.WriteLine("Value of {0} is {1}", variableIterator.Current.NodeId, value); + } + catch (Exception ex) + { + output.WriteLine("Error reading value of {0}: {1}", variableIterator.Current.NodeId, ex.Message); + } + } + else + { + variableIterator = variables.GetEnumerator(); + } + quit = quitEvent.WaitOne(500); + } } else { diff --git a/Libraries/Opc.Ua.Client/Session.cs b/Libraries/Opc.Ua.Client/Session.cs index 9bf81470c..4d06e56ab 100644 --- a/Libraries/Opc.Ua.Client/Session.cs +++ b/Libraries/Opc.Ua.Client/Session.cs @@ -4027,13 +4027,13 @@ protected virtual void OnKeepAlive(ServerState currentState, DateTime currentTim /// protected virtual bool OnKeepAliveError(ServiceResult result) { - long ticks = DateTime.UtcNow.Ticks; + DateTime now = DateTime.UtcNow; m_lastKeepAliveErrorStatusCode = result.StatusCode; if (result.StatusCode == StatusCodes.BadNoCommunication) { // keep alive read timed out - long delta = ticks - Interlocked.Read(ref m_lastKeepAliveTime); + long delta = now.Ticks - Interlocked.Read(ref m_lastKeepAliveTime); Utils.LogInfo( "KEEP ALIVE LATE: {0}s, EndpointUrl={1}, RequestCount={2}/{3}", ((double)delta) / TimeSpan.TicksPerSecond, @@ -4048,7 +4048,7 @@ protected virtual bool OnKeepAliveError(ServiceResult result) { try { - KeepAliveEventArgs args = new KeepAliveEventArgs(result, ServerState.Unknown, DateTime.UtcNow); + KeepAliveEventArgs args = new KeepAliveEventArgs(result, ServerState.Unknown, now); callback(this, args); return !args.CancelKeepAlive; } diff --git a/Tests/Opc.Ua.Client.Tests/ClientTest.cs b/Tests/Opc.Ua.Client.Tests/ClientTest.cs index 3425f260b..65196b005 100644 --- a/Tests/Opc.Ua.Client.Tests/ClientTest.cs +++ b/Tests/Opc.Ua.Client.Tests/ClientTest.cs @@ -32,14 +32,12 @@ using System.Diagnostics; using System.IO; using System.Linq; -using System.Runtime.Serialization; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading; using System.Threading.Tasks; using BenchmarkDotNet.Attributes; using Moq; -using Newtonsoft.Json.Linq; using NUnit.Framework; using Opc.Ua.Bindings; using Opc.Ua.Configuration;