From c5a3f49c88d3d907a56ec8d18f783426de5144e9 Mon Sep 17 00:00:00 2001 From: Miha Zupan Date: Fri, 25 Sep 2020 20:39:19 +0200 Subject: [PATCH] Fix System.Net.Sockets telemetry (#42726) --- .../Diagnostics/Tracing/TestEventListener.cs | 109 ++--- .../src/System/Net/Dns.cs | 48 +- .../src/System/Net/Sockets/Socket.cs | 175 +++++-- .../Sockets/SocketAsyncEventArgs.Windows.cs | 12 + .../Net/Sockets/SocketAsyncEventArgs.cs | 64 +-- .../System/Net/Sockets/SocketsTelemetry.cs | 98 ++-- .../tests/FunctionalTests/TelemetryTest.cs | 439 ++++++++++++++++-- 7 files changed, 723 insertions(+), 222 deletions(-) diff --git a/src/libraries/Common/tests/System/Diagnostics/Tracing/TestEventListener.cs b/src/libraries/Common/tests/System/Diagnostics/Tracing/TestEventListener.cs index b12f5022c62f1..74c0d9fa9395a 100644 --- a/src/libraries/Common/tests/System/Diagnostics/Tracing/TestEventListener.cs +++ b/src/libraries/Common/tests/System/Diagnostics/Tracing/TestEventListener.cs @@ -9,90 +9,91 @@ namespace System.Diagnostics.Tracing /// Simple event listener than invokes a callback for each event received. internal sealed class TestEventListener : EventListener { - private readonly string _targetSourceName; - private readonly Guid _targetSourceGuid; - private readonly EventLevel _level; + private class Settings + { + public EventLevel Level; + public EventKeywords Keywords; + } + + private readonly Dictionary _names = new Dictionary(); + private readonly Dictionary _guids = new Dictionary(); + private readonly double? _eventCounterInterval; private Action _eventWritten; - private List _tmpEventSourceList = new List(); + private readonly List _eventSourceList = new List(); public TestEventListener(string targetSourceName, EventLevel level, double? eventCounterInterval = null) { - // Store the arguments - _targetSourceName = targetSourceName; - _level = level; _eventCounterInterval = eventCounterInterval; - - LoadSourceList(); + AddSource(targetSourceName, level); } public TestEventListener(Guid targetSourceGuid, EventLevel level, double? eventCounterInterval = null) { - // Store the arguments - _targetSourceGuid = targetSourceGuid; - _level = level; _eventCounterInterval = eventCounterInterval; - - LoadSourceList(); + AddSource(targetSourceGuid, level); } - private void LoadSourceList() + public void AddSource(string name, EventLevel level, EventKeywords keywords = EventKeywords.All) => + AddSource(name, null, level, keywords); + + public void AddSource(Guid guid, EventLevel level, EventKeywords keywords = EventKeywords.All) => + AddSource(null, guid, level, keywords); + + private void AddSource(string name, Guid? guid, EventLevel level, EventKeywords keywords) { - // The base constructor, which is called before this constructor, - // will invoke the virtual OnEventSourceCreated method for each - // existing EventSource, which means OnEventSourceCreated will be - // called before _targetSourceGuid and _level have been set. As such, - // we store a temporary list that just exists from the moment this instance - // is created (instance field initializers run before the base constructor) - // and until we finish construction... in that window, OnEventSourceCreated - // will store the sources into the list rather than try to enable them directly, - // and then here we can enumerate that list, then clear it out. - List sources; - lock (_tmpEventSourceList) + lock (_eventSourceList) { - sources = _tmpEventSourceList; - _tmpEventSourceList = null; - } - foreach (EventSource source in sources) - { - EnableSourceIfMatch(source); + var settings = new Settings() + { + Level = level, + Keywords = keywords + }; + + if (name is not null) + _names.Add(name, settings); + + if (guid.HasValue) + _guids.Add(guid.Value, settings); + + foreach (EventSource source in _eventSourceList) + { + if (name == source.Name || guid == source.Guid) + { + EnableEventSource(source, level, keywords); + } + } } } + public void AddActivityTracking() => + AddSource("System.Threading.Tasks.TplEventSource", EventLevel.Informational, (EventKeywords)0x80 /* TasksFlowActivityIds */); + protected override void OnEventSourceCreated(EventSource eventSource) { - List tmp = _tmpEventSourceList; - if (tmp != null) + lock (_eventSourceList) { - lock (tmp) + _eventSourceList.Add(eventSource); + + if (_names.TryGetValue(eventSource.Name, out Settings settings) || + _guids.TryGetValue(eventSource.Guid, out settings)) { - if (_tmpEventSourceList != null) - { - _tmpEventSourceList.Add(eventSource); - return; - } + EnableEventSource(eventSource, settings.Level, settings.Keywords); } } - - EnableSourceIfMatch(eventSource); } - private void EnableSourceIfMatch(EventSource source) + private void EnableEventSource(EventSource source, EventLevel level, EventKeywords keywords) { - if (source.Name.Equals(_targetSourceName) || - source.Guid.Equals(_targetSourceGuid)) + var args = new Dictionary(); + + if (_eventCounterInterval != null) { - if (_eventCounterInterval != null) - { - var args = new Dictionary { { "EventCounterIntervalSec", _eventCounterInterval?.ToString() } }; - EnableEvents(source, _level, EventKeywords.All, args); - } - else - { - EnableEvents(source, _level); - } + args.Add("EventCounterIntervalSec", _eventCounterInterval.ToString()); } + + EnableEvents(source, level, keywords, args); } public void RunWithCallback(Action handler, Action body) diff --git a/src/libraries/System.Net.NameResolution/src/System/Net/Dns.cs b/src/libraries/System.Net.NameResolution/src/System/Net/Dns.cs index 98cb7ccbc0b55..60cc964ec08ce 100644 --- a/src/libraries/System.Net.NameResolution/src/System/Net/Dns.cs +++ b/src/libraries/System.Net.NameResolution/src/System/Net/Dns.cs @@ -466,34 +466,9 @@ private static Task GetHostEntryOrAddressesCoreAsync(string hostName, bool justR if (NameResolutionTelemetry.Log.IsEnabled()) { - ValueStopwatch stopwatch = NameResolutionTelemetry.Log.BeforeResolution(hostName); - - Task coreTask; - try - { - coreTask = NameResolutionPal.GetAddrInfoAsync(hostName, justAddresses); - } - catch when (LogFailure(stopwatch)) - { - Debug.Fail("LogFailure should return false"); - throw; - } - - coreTask.ContinueWith( - (task, state) => - { - NameResolutionTelemetry.Log.AfterResolution( - stopwatch: (ValueStopwatch)state!, - successful: task.IsCompletedSuccessfully); - }, - state: stopwatch, - cancellationToken: default, - TaskContinuationOptions.ExecuteSynchronously, - TaskScheduler.Default); - - // coreTask is not actually a base Task, but Task / Task - // We have to return it and not the continuation - return coreTask; + return justAddresses + ? (Task)GetAddrInfoWithTelemetryAsync(hostName, justAddresses) + : (Task)GetAddrInfoWithTelemetryAsync(hostName, justAddresses); } else { @@ -506,6 +481,23 @@ private static Task GetHostEntryOrAddressesCoreAsync(string hostName, bool justR RunAsync(s => GetHostEntryCore((string)s), hostName); } + private static async Task GetAddrInfoWithTelemetryAsync(string hostName, bool justAddresses) + where T : class + { + ValueStopwatch stopwatch = NameResolutionTelemetry.Log.BeforeResolution(hostName); + + T? result = null; + try + { + result = await ((Task)NameResolutionPal.GetAddrInfoAsync(hostName, justAddresses)).ConfigureAwait(false); + return result; + } + finally + { + NameResolutionTelemetry.Log.AfterResolution(stopwatch, successful: result is not null); + } + } + private static Task RunAsync(Func func, object arg) => Task.Factory.StartNew(func!, arg, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs index 05f90292ff9db..c9b2bfd5eea36 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs @@ -1091,25 +1091,37 @@ public Socket Accept() // This may throw ObjectDisposedException. SafeSocketHandle acceptedSocketHandle; - SocketError errorCode = SocketPal.Accept( - _handle, - socketAddress.Buffer, - ref socketAddress.InternalSize, - out acceptedSocketHandle); + SocketError errorCode; + try + { + errorCode = SocketPal.Accept( + _handle, + socketAddress.Buffer, + ref socketAddress.InternalSize, + out acceptedSocketHandle); + } + catch (Exception ex) + { + if (SocketsTelemetry.Log.IsEnabled()) + { + SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted, ex.Message); + } + + throw; + } // Throw an appropriate SocketException if the native call fails. if (errorCode != SocketError.Success) { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(errorCode, null); - Debug.Assert(acceptedSocketHandle.IsInvalid); UpdateAcceptSocketErrorForDisposed(ref errorCode); + + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(errorCode); + UpdateStatusAfterSocketErrorAndThrowException(errorCode); } - else - { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop(); - } + + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(SocketError.Success); Debug.Assert(!acceptedSocketHandle.IsInvalid); @@ -2127,8 +2139,6 @@ private bool CanUseConnectEx(EndPoint remoteEP) internal IAsyncResult UnsafeBeginConnect(EndPoint remoteEP, AsyncCallback? callback, object? state, bool flowContext = false) { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStart(remoteEP); - if (CanUseConnectEx(remoteEP)) { return BeginConnectEx(remoteEP, flowContext, callback, state); @@ -2348,7 +2358,23 @@ public void Disconnect(bool reuseSocket) // int - Return code from async Connect, 0 for success, SocketError.NotConnected otherwise public void EndConnect(IAsyncResult asyncResult) { - ThrowIfDisposed(); + // There are three AsyncResult types we support in EndConnect: + // - ConnectAsyncResult - a fully synchronous operation that already completed, wrapped in an AsyncResult + // - MultipleAddressConnectAsyncResult - a parent operation for other Connects (connecting to DnsEndPoint) + // - ConnectOverlappedAsyncResult - a connect to an IPEndPoint + // For Telemetry, we already logged everything for ConnectAsyncResult in DoConnect, + // and we want to avoid logging duplicated events for MultipleAddressConnect. + // Therefore, we always check that asyncResult is ConnectOverlapped before logging. + + if (Disposed) + { + if (SocketsTelemetry.Log.IsEnabled() && asyncResult is ConnectOverlappedAsyncResult) + { + SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket); + } + + ThrowObjectDisposedException(); + } // Validate input parameters. if (asyncResult == null) @@ -2376,13 +2402,13 @@ public void EndConnect(IAsyncResult asyncResult) if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"asyncResult:{asyncResult}"); Exception? ex = castedAsyncResult.Result as Exception; + if (ex != null || (SocketError)castedAsyncResult.ErrorCode != SocketError.Success) { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectFailedAndStop((SocketError)castedAsyncResult.ErrorCode, ex?.Message); + SocketError errorCode = (SocketError)castedAsyncResult.ErrorCode; if (ex == null) { - SocketError errorCode = (SocketError)castedAsyncResult.ErrorCode; UpdateConnectSocketErrorForDisposed(ref errorCode); // Update the internal state of this socket according to the error before throwing. SocketException se = SocketExceptionFactory.CreateSocketException((int)errorCode, castedAsyncResult.RemoteEndPoint); @@ -2390,11 +2416,19 @@ public void EndConnect(IAsyncResult asyncResult) ex = se; } + if (SocketsTelemetry.Log.IsEnabled() && castedAsyncResult is ConnectOverlappedAsyncResult) + { + SocketsTelemetry.Log.AfterConnect(errorCode, ex.Message); + } + if (NetEventSource.Log.IsEnabled()) NetEventSource.Error(this, ex); ExceptionDispatchInfo.Throw(ex); } - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStop(); + if (SocketsTelemetry.Log.IsEnabled() && castedAsyncResult is ConnectOverlappedAsyncResult) + { + SocketsTelemetry.Log.AfterConnect(SocketError.Success); + } if (NetEventSource.Log.IsEnabled()) NetEventSource.Connected(this, LocalEndPoint, RemoteEndPoint); } @@ -3512,21 +3546,33 @@ private IAsyncResult BeginAcceptCommon(Socket? acceptSocket, int receiveSize, As if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStart(_rightEndPoint); int socketAddressSize = GetAddressSize(_rightEndPoint); - SocketError errorCode = SocketPal.AcceptAsync(this, _handle, acceptHandle, receiveSize, socketAddressSize, asyncResult); + SocketError errorCode; + try + { + errorCode = SocketPal.AcceptAsync(this, _handle, acceptHandle, receiveSize, socketAddressSize, asyncResult); + } + catch (Exception ex) + { + if (SocketsTelemetry.Log.IsEnabled()) + { + SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted, ex.Message); + } + + throw; + } if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"AcceptAsync returns:{errorCode} {asyncResult}"); // Throw an appropriate SocketException if the native call fails synchronously. if (!CheckErrorAndUpdateStatus(errorCode)) { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(errorCode, null); - UpdateAcceptSocketErrorForDisposed(ref errorCode); + + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(errorCode); + throw new SocketException((int)errorCode); } - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop(); - // Finish the flow capture, maybe complete here. asyncResult.FinishPostingAsyncOp(ref Caches.AcceptClosureCache); @@ -3552,7 +3598,12 @@ public Socket EndAccept(IAsyncResult asyncResult) } private Socket EndAcceptCommon(out byte[]? buffer, out int bytesTransferred, IAsyncResult asyncResult) { - ThrowIfDisposed(); + if (Disposed) + { + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted); + + ThrowObjectDisposedException(); + } // Validate input parameters. if (asyncResult == null) @@ -3573,21 +3624,23 @@ private Socket EndAcceptCommon(out byte[]? buffer, out int bytesTransferred, IAs bytesTransferred = (int)castedAsyncResult.BytesTransferred; buffer = castedAsyncResult.Buffer; + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.BytesReceived(bytesTransferred); + castedAsyncResult.EndCalled = true; // Throw an appropriate SocketException if the native call failed asynchronously. SocketError errorCode = (SocketError)castedAsyncResult.ErrorCode; + if (errorCode != SocketError.Success) { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(errorCode, null); - UpdateAcceptSocketErrorForDisposed(ref errorCode); + + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(errorCode); + UpdateStatusAfterSocketErrorAndThrowException(errorCode); } - else - { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop(); - } + + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(SocketError.Success); if (NetEventSource.Log.IsEnabled()) NetEventSource.Accepted(socket, socket.RemoteEndPoint, socket.LocalEndPoint); return socket; @@ -3641,16 +3694,23 @@ public bool AcceptAsync(SocketAsyncEventArgs e) SafeSocketHandle? acceptHandle; e.AcceptSocket = GetOrCreateAcceptSocket(e.AcceptSocket, true, "AcceptSocket", out acceptHandle); + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStart(_rightEndPoint!); + // Prepare for and make the native call. e.StartOperationCommon(this, SocketAsyncOperation.Accept); e.StartOperationAccept(); - SocketError socketError = SocketError.Success; + SocketError socketError; try { socketError = e.DoOperationAccept(this, _handle, acceptHandle); } - catch + catch (Exception ex) { + if (SocketsTelemetry.Log.IsEnabled()) + { + SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted, ex.Message); + } + // Clear in-use flag on event args object. e.Complete(); throw; @@ -3741,12 +3801,17 @@ private bool ConnectAsync(SocketAsyncEventArgs e, bool userSocket) _rightEndPoint = endPointSnapshot; } + if (SocketsTelemetry.Log.IsEnabled()) + { + SocketsTelemetry.Log.ConnectStart(e._socketAddress!); + } + // Prepare for the native call. e.StartOperationCommon(this, SocketAsyncOperation.Connect); e.StartOperationConnect(multipleConnect: null, userSocket); // Make the native call. - SocketError socketError = SocketError.Success; + SocketError socketError; try { if (CanUseConnectEx(endPointSnapshot)) @@ -3759,8 +3824,13 @@ private bool ConnectAsync(SocketAsyncEventArgs e, bool userSocket) socketError = e.DoOperationConnect(this, _handle); } } - catch + catch (Exception ex) { + if (SocketsTelemetry.Log.IsEnabled()) + { + SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message); + } + _rightEndPoint = oldEndPoint; // Clear in-use flag on event args object. @@ -4197,22 +4267,36 @@ private void DoConnect(EndPoint endPointSnapshot, Internals.SocketAddress socket { if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStart(socketAddress); - SocketError errorCode = SocketPal.Connect(_handle, socketAddress.Buffer, socketAddress.Size); + SocketError errorCode; + try + { + errorCode = SocketPal.Connect(_handle, socketAddress.Buffer, socketAddress.Size); + } + catch (Exception ex) + { + if (SocketsTelemetry.Log.IsEnabled()) + { + SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message); + } + + throw; + } // Throw an appropriate SocketException if the native call fails. if (errorCode != SocketError.Success) { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectFailedAndStop(errorCode, null); - UpdateConnectSocketErrorForDisposed(ref errorCode); // Update the internal state of this socket according to the error before throwing. SocketException socketException = SocketExceptionFactory.CreateSocketException((int)errorCode, endPointSnapshot); UpdateStatusAfterSocketError(socketException); if (NetEventSource.Log.IsEnabled()) NetEventSource.Error(this, socketException); + + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterConnect(errorCode); + throw socketException; } - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStop(); + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterConnect(SocketError.Success); if (_rightEndPoint == null) { @@ -4593,6 +4677,14 @@ private IAsyncResult BeginConnectEx(EndPoint remoteEP, bool flowContext, AsyncCa EndPoint endPointSnapshot = remoteEP; Internals.SocketAddress socketAddress = Serialize(ref endPointSnapshot); + if (SocketsTelemetry.Log.IsEnabled()) + { + SocketsTelemetry.Log.ConnectStart(socketAddress); + + // Ignore flowContext when using Telemetry to avoid losing Activity tracking + flowContext = true; + } + WildcardBindForConnectIfNecessary(endPointSnapshot.AddressFamily); // Allocate the async result and the event we'll pass to the thread pool. @@ -4615,8 +4707,13 @@ private IAsyncResult BeginConnectEx(EndPoint remoteEP, bool flowContext, AsyncCa { errorCode = SocketPal.ConnectAsync(this, _handle, socketAddress.Buffer, socketAddress.Size, asyncResult); } - catch + catch (Exception ex) { + if (SocketsTelemetry.Log.IsEnabled()) + { + SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message); + } + // _rightEndPoint will always equal oldEndPoint. _rightEndPoint = oldEndPoint; throw; @@ -4636,6 +4733,8 @@ private IAsyncResult BeginConnectEx(EndPoint remoteEP, bool flowContext, AsyncCa // Update the internal state of this socket according to the error before throwing. _rightEndPoint = oldEndPoint; + if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterConnect(errorCode); + throw new SocketException((int)errorCode); } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEventArgs.Windows.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEventArgs.Windows.cs index 6368666ec6bf4..52b5355be26ba 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEventArgs.Windows.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEventArgs.Windows.cs @@ -155,6 +155,9 @@ private unsafe SocketError ProcessIOCPResult(bool success, int bytesTransferred, // so we can set the results right now. FreeNativeOverlapped(overlapped); FinishOperationSyncSuccess(bytesTransferred, SocketFlags.None); + + if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry(); + return SocketError.Success; } @@ -170,6 +173,9 @@ private unsafe SocketError ProcessIOCPResult(bool success, int bytesTransferred, // Completed synchronously with a failure. FreeNativeOverlapped(overlapped); FinishOperationSyncFailure(socketError, bytesTransferred, SocketFlags.None); + + if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry(); + return socketError; } @@ -202,6 +208,9 @@ private unsafe SocketError ProcessIOCPResultWithSingleBufferHandle(SocketError s _singleBufferHandleState = SingleBufferHandleState.None; FreeNativeOverlapped(overlapped); FinishOperationSyncSuccess(bytesTransferred, SocketFlags.None); + + if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry(); + return SocketError.Success; } @@ -218,6 +227,9 @@ private unsafe SocketError ProcessIOCPResultWithSingleBufferHandle(SocketError s _singleBufferHandleState = SingleBufferHandleState.None; FreeNativeOverlapped(overlapped); FinishOperationSyncFailure(socketError, bytesTransferred, SocketFlags.None); + + if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry(); + return socketError; } diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEventArgs.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEventArgs.cs index 114a65d256d70..2a40157b190a3 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEventArgs.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEventArgs.cs @@ -198,11 +198,38 @@ public int BytesTransferred public event EventHandler? Completed; + private void OnCompletedInternal() + { + if (SocketsTelemetry.Log.IsEnabled()) + { + AfterConnectAcceptTelemetry(); + } + + OnCompleted(this); + } + protected virtual void OnCompleted(SocketAsyncEventArgs e) { Completed?.Invoke(e._currentSocket, e); } + private void AfterConnectAcceptTelemetry() + { + switch (LastOperation) + { + case SocketAsyncOperation.Accept: + SocketsTelemetry.Log.AfterAccept(SocketError); + break; + + case SocketAsyncOperation.Connect: + if (_multipleConnect is null) + { + SocketsTelemetry.Log.AfterConnect(SocketError); + } + break; + } + } + // DisconnectResuseSocket property. public bool DisconnectReuseSocket { @@ -420,7 +447,7 @@ internal void SetResults(Exception exception, int bytesTransferred, SocketFlags private static void ExecutionCallback(object? state) { var thisRef = (SocketAsyncEventArgs)state!; - thisRef.OnCompleted(thisRef); + thisRef.OnCompletedInternal(); } // Marks this object as no longer "in-use". Will also execute a Dispose deferred @@ -509,7 +536,9 @@ internal void StartOperationCommon(Socket? socket, SocketAsyncOperation operatio _currentSocket = socket; // Capture execution context if needed (it is unless explicitly disabled). - if (_flowExecutionContext) + // If Telemetry is enabled, make sure to capture the context if we're making a Connect or Accept call to preserve the activity + if (_flowExecutionContext || + (SocketsTelemetry.Log.IsEnabled() && (operation == SocketAsyncOperation.Connect || operation == SocketAsyncOperation.Accept))) { _context = ExecutionContext.Capture(); } @@ -547,8 +576,6 @@ internal void StartOperationAccept() _acceptBuffer = new byte[_acceptAddressBufferCount]; } } - - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStart(_currentSocket!._rightEndPoint!); } internal void StartOperationConnect(MultipleConnectAsync? multipleConnect, bool userSocket) @@ -556,9 +583,6 @@ internal void StartOperationConnect(MultipleConnectAsync? multipleConnect, bool _multipleConnect = multipleConnect; _connectSocket = null; _userSocket = userSocket; - - // Log only the actual connect operation to a remote endpoint. - if (SocketsTelemetry.Log.IsEnabled() && multipleConnect == null) SocketsTelemetry.Log.ConnectStart(_socketAddress!); } internal void CancelConnectAsync() @@ -572,8 +596,6 @@ internal void CancelConnectAsync() } else { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectCanceledAndStop(); - // Otherwise we're doing a normal ConnectAsync - cancel it by closing the socket. // _currentSocket will only be null if _multipleConnect was set, so we don't have to check. if (_currentSocket == null) @@ -589,12 +611,6 @@ internal void FinishOperationSyncFailure(SocketError socketError, int bytesTrans { SetResults(socketError, bytesTransferred, flags); - if (SocketsTelemetry.Log.IsEnabled()) - { - if (_multipleConnect == null && _completedOperation == SocketAsyncOperation.Connect) SocketsTelemetry.Log.ConnectFailedAndStop(socketError, null); - if (_completedOperation == SocketAsyncOperation.Accept) SocketsTelemetry.Log.AcceptFailedAndStop(socketError, null); - } - // This will be null if we're doing a static ConnectAsync to a DnsEndPoint with AddressFamily.Unspecified; // the attempt socket will be closed anyways, so not updating the state is OK. // If we're doing a static ConnectAsync to an IPEndPoint, we need to dispose @@ -640,7 +656,7 @@ internal void FinishOperationAsyncFailure(SocketError socketError, int bytesTran if (context == null) { - OnCompleted(this); + OnCompletedInternal(); } else { @@ -656,7 +672,7 @@ internal void FinishConnectByNameAsyncFailure(Exception exception, int bytesTran if (context == null) { - OnCompleted(this); + OnCompletedInternal(); } else { @@ -677,7 +693,7 @@ internal void FinishWrapperConnectSuccess(Socket? connectSocket, int bytesTransf Complete(); if (context == null) { - OnCompleted(this); + OnCompletedInternal(); } else { @@ -715,13 +731,9 @@ internal void FinishOperationSyncSuccess(int bytesTransferred, SocketFlags flags } catch (ObjectDisposedException) { } } - - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop(); } else { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(socketError, null); - SetResults(socketError, bytesTransferred, flags); _acceptSocket = null; _currentSocket.UpdateStatusAfterSocketError(socketError); @@ -741,16 +753,12 @@ internal void FinishOperationSyncSuccess(int bytesTransferred, SocketFlags flags catch (ObjectDisposedException) { } } - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStop(); - // Mark socket connected. _currentSocket!.SetToConnected(); _connectSocket = _currentSocket; } else { - if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectFailedAndStop(socketError, null); - SetResults(socketError, bytesTransferred, flags); _currentSocket!.UpdateStatusAfterSocketError(socketError); } @@ -814,7 +822,7 @@ internal void FinishOperationAsyncSuccess(int bytesTransferred, SocketFlags flag // Raise completion event. if (context == null) { - OnCompleted(this); + OnCompletedInternal(); } else { @@ -834,6 +842,8 @@ private void FinishOperationSync(SocketError socketError, int bytesTransferred, { FinishOperationSyncFailure(socketError, bytesTransferred, flags); } + + if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry(); } private static void LogBytesTransferEvents(SocketType? socketType, SocketAsyncOperation operation, int bytesTransferred) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketsTelemetry.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketsTelemetry.cs index f3430e7b7ed7d..7b7641f2a4003 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketsTelemetry.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketsTelemetry.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Diagnostics; using System.Diagnostics.Tracing; using System.Threading; @@ -26,17 +27,13 @@ internal sealed class SocketsTelemetry : EventSource private long _datagramsSent; [Event(1, Level = EventLevel.Informational)] - public void ConnectStart(string? address) + private void ConnectStart(string? address) { - Interlocked.Increment(ref _outgoingConnectionsEstablished); - if (IsEnabled(EventLevel.Informational, EventKeywords.All)) - { - WriteEvent(eventId: 1, address ?? ""); - } + WriteEvent(eventId: 1, address); } [Event(2, Level = EventLevel.Informational)] - public void ConnectStop() + private void ConnectStop() { if (IsEnabled(EventLevel.Informational, EventKeywords.All)) { @@ -45,105 +42,108 @@ public void ConnectStop() } [Event(3, Level = EventLevel.Error)] - public void ConnectFailed(SocketError error, string? exceptionMessage) + private void ConnectFailed(SocketError error, string? exceptionMessage) { if (IsEnabled(EventLevel.Error, EventKeywords.All)) { - WriteEvent(eventId: 3, (int)error, exceptionMessage ?? string.Empty); + WriteEvent(eventId: 3, (int)error, exceptionMessage); } } - [Event(4, Level = EventLevel.Warning)] - public void ConnectCanceled() + [Event(4, Level = EventLevel.Informational)] + private void AcceptStart(string? address) { - if (IsEnabled(EventLevel.Warning, EventKeywords.All)) - { - WriteEvent(eventId: 4); - } + WriteEvent(eventId: 4, address); } [Event(5, Level = EventLevel.Informational)] - public void AcceptStart(string? address) + private void AcceptStop() { - Interlocked.Increment(ref _incomingConnectionsEstablished); if (IsEnabled(EventLevel.Informational, EventKeywords.All)) { - WriteEvent(eventId: 5, address ?? ""); + WriteEvent(eventId: 5); } } - [Event(6, Level = EventLevel.Informational)] - public void AcceptStop() - { - if (IsEnabled(EventLevel.Informational, EventKeywords.All)) - { - WriteEvent(eventId: 6); - } - } - - [Event(7, Level = EventLevel.Error)] - public void AcceptFailed(SocketError error, string? exceptionMessage) + [Event(6, Level = EventLevel.Error)] + private void AcceptFailed(SocketError error, string? exceptionMessage) { if (IsEnabled(EventLevel.Error, EventKeywords.All)) { - WriteEvent(eventId: 7, (int)error, exceptionMessage ?? string.Empty); + WriteEvent(eventId: 6, (int)error, exceptionMessage); } } [NonEvent] public void ConnectStart(Internals.SocketAddress address) { - ConnectStart(address.ToString()); - } - - [NonEvent] - public void ConnectStart(EndPoint address) - { - ConnectStart(address.ToString()); + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) + { + ConnectStart(address.ToString()); + } } [NonEvent] - public void ConnectCanceledAndStop() + public void AfterConnect(SocketError error, string? exceptionMessage = null) { - ConnectCanceled(); - ConnectStop(); - } + if (error == SocketError.Success) + { + Debug.Assert(exceptionMessage is null); + Interlocked.Increment(ref _outgoingConnectionsEstablished); + } + else + { + ConnectFailed(error, exceptionMessage); + } - [NonEvent] - public void ConnectFailedAndStop(SocketError error, string? exceptionMessage) - { - ConnectFailed(error, exceptionMessage); ConnectStop(); } [NonEvent] public void AcceptStart(Internals.SocketAddress address) { - AcceptStart(address.ToString()); + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) + { + AcceptStart(address.ToString()); + } } [NonEvent] public void AcceptStart(EndPoint address) { - AcceptStart(address.ToString()); + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) + { + AcceptStart(address.ToString()); + } } [NonEvent] - public void AcceptFailedAndStop(SocketError error, string? exceptionMessage) + public void AfterAccept(SocketError error, string? exceptionMessage = null) { - AcceptFailed(error, exceptionMessage); + if (error == SocketError.Success) + { + Debug.Assert(exceptionMessage is null); + Interlocked.Increment(ref _incomingConnectionsEstablished); + } + else + { + AcceptFailed(error, exceptionMessage); + } + AcceptStop(); } [NonEvent] public void BytesReceived(int count) { + Debug.Assert(count >= 0); Interlocked.Add(ref _bytesReceived, count); } [NonEvent] public void BytesSent(int count) { + Debug.Assert(count >= 0); Interlocked.Add(ref _bytesSent, count); } diff --git a/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs b/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs index 783fbf52e133e..8f834a5c112c3 100644 --- a/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs +++ b/src/libraries/System.Net.Sockets/tests/FunctionalTests/TelemetryTest.cs @@ -1,14 +1,14 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics; using System.Diagnostics.Tracing; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.DotNet.RemoteExecutor; +using Microsoft.DotNet.XUnitExtensions; using Xunit; using Xunit.Abstractions; @@ -35,16 +35,323 @@ public static void EventSource_ExistsWithCorrectId() Assert.NotEmpty(EventSource.GenerateManifest(esType, esType.Assembly.Location)); } + public static IEnumerable SocketMethods_MemberData() + { + yield return new[] { "Sync" }; + yield return new[] { "Task" }; + yield return new[] { "Apm" }; + yield return new[] { "Eap" }; + } + + public static IEnumerable SocketMethods_Matrix_MemberData() + { + return from connectMethod in SocketMethods_MemberData() + from acceptMethod in SocketMethods_MemberData() + select new[] { connectMethod[0], acceptMethod[0] }; + } + + public static IEnumerable SocketMethods_WithBools_MemberData() + { + return from connectMethod in SocketMethods_MemberData() + from useDnsEndPoint in new[] { true, false } + select new[] { connectMethod[0], useDnsEndPoint }; + } + + private static async Task GetRemoteEndPointAsync(string useDnsEndPointString, int port) + { + const string Address = "microsoft.com"; + + if (bool.Parse(useDnsEndPointString)) + { + return new DnsEndPoint(Address, port); + } + else + { + IPAddress ip = (await Dns.GetHostAddressesAsync(Address))[0]; + return new IPEndPoint(ip, port); + } + } + + // RemoteExecutor only supports simple argument types such as strings + // That's why we use this helper method instead of returning SocketHelperBases from MemberDatas directly + private static SocketHelperBase GetHelperBase(string socketMethod) + { + return socketMethod switch + { + "Sync" => new SocketHelperArraySync(), + "Task" => new SocketHelperTask(), + "Apm" => new SocketHelperApm(), + "Eap" => new SocketHelperEap(), + _ => throw new ArgumentException(socketMethod) + }; + } + + [OuterLoop] + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [MemberData(nameof(SocketMethods_Matrix_MemberData))] + public void EventSource_SocketConnectsLoopback_LogsConnectAcceptStartStop(string connectMethod, string acceptMethod) + { + RemoteExecutor.Invoke(async (connectMethod, acceptMethod) => + { + using var listener = new TestEventListener("System.Net.Sockets", EventLevel.Verbose, 0.1); + listener.AddActivityTracking(); + + var events = new ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)>(); + await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), async () => + { + using var server = new Socket(SocketType.Stream, ProtocolType.Tcp); + server.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + server.Listen(); + + using var client = new Socket(SocketType.Stream, ProtocolType.Tcp); + + Task acceptTask = GetHelperBase(acceptMethod).AcceptAsync(server); + await WaitForEventAsync(events, "AcceptStart"); + + await GetHelperBase(connectMethod).ConnectAsync(client, server.LocalEndPoint); + await acceptTask; + + await WaitForEventAsync(events, "AcceptStop"); + await WaitForEventAsync(events, "ConnectStop"); + + await WaitForEventCountersAsync(events); + }); + Assert.DoesNotContain(events, ev => ev.Event.EventId == 0); // errors from the EventSource itself + + VerifyStartStopEvents(events, connect: true, expectedCount: 1); + VerifyStartStopEvents(events, connect: false, expectedCount: 1); + + Assert.DoesNotContain(events, e => e.Event.EventName == "ConnectFailed"); + Assert.DoesNotContain(events, e => e.Event.EventName == "AcceptFailed"); + + VerifyEventCounters(events, connectCount: 1); + }, connectMethod, acceptMethod).Dispose(); + } + + [OuterLoop] + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [MemberData(nameof(SocketMethods_WithBools_MemberData))] + public void EventSource_SocketConnectsRemote_LogsConnectStartStop(string connectMethod, bool useDnsEndPoint) + { + RemoteExecutor.Invoke(async (connectMethod, useDnsEndPointString) => + { + using var listener = new TestEventListener("System.Net.Sockets", EventLevel.Verbose, 0.1); + listener.AddActivityTracking(); + + var events = new ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)>(); + await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), async () => + { + using var client = new Socket(SocketType.Stream, ProtocolType.Tcp); + + SocketHelperBase socketHelper = GetHelperBase(connectMethod); + + EndPoint endPoint = await GetRemoteEndPointAsync(useDnsEndPointString, port: 443); + await socketHelper.ConnectAsync(client, endPoint); + + await WaitForEventAsync(events, "ConnectStop"); + + await WaitForEventCountersAsync(events); + }); + Assert.DoesNotContain(events, ev => ev.Event.EventId == 0); // errors from the EventSource itself + + VerifyStartStopEvents(events, connect: true, expectedCount: 1); + + Assert.DoesNotContain(events, e => e.Event.EventName == "ConnectFailed"); + + VerifyEventCounters(events, connectCount: 1, connectOnly: true); + }, connectMethod, useDnsEndPoint.ToString()).Dispose(); + } + + [OuterLoop] + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [PlatformSpecific(~(TestPlatforms.OSX | TestPlatforms.FreeBSD))] // Same as Connect.ConnectGetsCanceledByDispose + [MemberData(nameof(SocketMethods_WithBools_MemberData))] + public void EventSource_SocketConnectFailure_LogsConnectFailed(string connectMethod, bool useDnsEndPoint) + { + if (connectMethod == "Sync" && PlatformDetection.IsRedHatFamily7) + { + // [ActiveIssue("https://github.com/dotnet/runtime/issues/42686")] + throw new SkipTestException("Disposing a Socket performing a sync operation can hang on RedHat7 systems"); + } + + RemoteExecutor.Invoke(async (connectMethod, useDnsEndPointString) => + { + EndPoint endPoint = await GetRemoteEndPointAsync(useDnsEndPointString, port: 12345); + + using var listener = new TestEventListener("System.Net.Sockets", EventLevel.Verbose, 0.1); + listener.AddActivityTracking(); + + var events = new ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)>(); + await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), async () => + { + using var client = new Socket(SocketType.Stream, ProtocolType.Tcp); + + SocketHelperBase socketHelper = GetHelperBase(connectMethod); + + Exception ex = await Assert.ThrowsAnyAsync(async () => + { + Task connectTask = socketHelper.ConnectAsync(client, endPoint); + await WaitForEventAsync(events, "ConnectStart"); + Task disposeTask = Task.Run(() => client.Dispose()); + await new[] { connectTask, disposeTask }.WhenAllOrAnyFailed(); + }); + + if (ex is SocketException se) + { + Assert.NotEqual(SocketError.TimedOut, se.SocketErrorCode); + } + + await WaitForEventAsync(events, "ConnectStop"); + + await WaitForEventCountersAsync(events); + }); + + VerifyConnectFailureEvents(events); + }, connectMethod, useDnsEndPoint.ToString()).Dispose(); + } + + [OuterLoop] + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [MemberData(nameof(SocketMethods_MemberData))] + public void EventSource_SocketAcceptFailure_LogsAcceptFailed(string acceptMethod) + { + if (acceptMethod == "Sync" && PlatformDetection.IsRedHatFamily7) + { + // [ActiveIssue("https://github.com/dotnet/runtime/issues/42686")] + throw new SkipTestException("Disposing a Socket performing a sync operation can hang on RedHat7 systems"); + } + + RemoteExecutor.Invoke(async acceptMethod => + { + using var listener = new TestEventListener("System.Net.Sockets", EventLevel.Verbose, 0.1); + listener.AddActivityTracking(); + + var events = new ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)>(); + await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), async () => + { + using var server = new Socket(SocketType.Stream, ProtocolType.Tcp); + server.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + server.Listen(); + + await Assert.ThrowsAnyAsync(async () => + { + Task acceptTask = GetHelperBase(acceptMethod).AcceptAsync(server); + await WaitForEventAsync(events, "AcceptStart"); + Task disposeTask = Task.Run(() => server.Dispose()); + await new[] { acceptTask, disposeTask }.WhenAllOrAnyFailed(); + }); + + await WaitForEventAsync(events, "AcceptStop"); + + await WaitForEventCountersAsync(events); + }); + Assert.DoesNotContain(events, ev => ev.Event.EventId == 0); // errors from the EventSource itself + + VerifyStartStopEvents(events, connect: false, expectedCount: 1); + + (EventWrittenEventArgs Event, Guid ActivityId) failed = Assert.Single(events, e => e.Event.EventName == "AcceptFailed"); + Assert.Equal(2, failed.Event.Payload.Count); + Assert.True(Enum.IsDefined((SocketError)failed.Event.Payload[0])); + Assert.IsType(failed.Event.Payload[1]); + + (_, Guid startActivityId) = Assert.Single(events, e => e.Event.EventName == "AcceptStart"); + Assert.Equal(startActivityId, failed.ActivityId); + + VerifyEventCounters(events, connectCount: 0); + }, acceptMethod).Dispose(); + } + + [OuterLoop] + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData("Task", true)] + [InlineData("Task", false)] + [InlineData("Eap", true)] + [InlineData("Eap", false)] + public void EventSource_ConnectAsyncCanceled_LogsConnectFailed(string connectMethod, bool useDnsEndPoint) + { + RemoteExecutor.Invoke(async (connectMethod, useDnsEndPointString) => + { + EndPoint endPoint = await GetRemoteEndPointAsync(useDnsEndPointString, port: 12345); + + using var listener = new TestEventListener("System.Net.Sockets", EventLevel.Verbose, 0.1); + listener.AddActivityTracking(); + + var events = new ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)>(); + await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), async () => + { + using var client = new Socket(SocketType.Stream, ProtocolType.Tcp); + + await Assert.ThrowsAnyAsync(async () => + { + switch (connectMethod) + { + case "Task": + using (var cts = new CancellationTokenSource()) + { + ValueTask connectTask = client.ConnectAsync(endPoint, cts.Token); + await WaitForEventAsync(events, "ConnectStart"); + cts.Cancel(); + await connectTask; + } + break; + + case "Eap": + using (var saea = new SocketAsyncEventArgs()) + { + var tcs = new TaskCompletionSource(); + saea.RemoteEndPoint = endPoint; + saea.Completed += (_, __) => + { + Assert.NotEqual(SocketError.Success, saea.SocketError); + tcs.SetException(new SocketException((int)saea.SocketError)); + }; + Assert.True(client.ConnectAsync(saea)); + await WaitForEventAsync(events, "ConnectStart"); + Socket.CancelConnectAsync(saea); + await tcs.Task; + } + break; + } + }); + + await WaitForEventAsync(events, "ConnectStop"); + + await WaitForEventCountersAsync(events); + }); + + VerifyConnectFailureEvents(events); + }, connectMethod, useDnsEndPoint.ToString()).Dispose(); + } + + private static void VerifyConnectFailureEvents(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events) + { + Assert.DoesNotContain(events, ev => ev.Event.EventId == 0); // errors from the EventSource itself + + VerifyStartStopEvents(events, connect: true, expectedCount: 1); + + (EventWrittenEventArgs Event, Guid ActivityId) failed = Assert.Single(events, e => e.Event.EventName == "ConnectFailed"); + Assert.Equal(2, failed.Event.Payload.Count); + Assert.True(Enum.IsDefined((SocketError)failed.Event.Payload[0])); + Assert.IsType(failed.Event.Payload[1]); + + (_, Guid startActivityId) = Assert.Single(events, e => e.Event.EventName == "ConnectStart"); + Assert.Equal(startActivityId, failed.ActivityId); + + VerifyEventCounters(events, connectCount: 0); + } + [OuterLoop] [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] public void EventSource_EventsRaisedAsExpected() { - RemoteExecutor.Invoke(() => + RemoteExecutor.Invoke(async () => { using (var listener = new TestEventListener("System.Net.Sockets", EventLevel.Verbose, 0.1)) { - var events = new ConcurrentQueue(); - listener.RunWithCallbackAsync(events.Enqueue, async () => + listener.AddActivityTracking(); + + var events = new ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)>(); + await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), async () => { // Invoke several tests to execute code paths while tracing is enabled @@ -65,36 +372,116 @@ public void EventSource_EventsRaisedAsExpected() await new NetworkStreamTest().CopyToAsync_AllDataCopied(4096, true).ConfigureAwait(false); await new NetworkStreamTest().Timeout_ValidData_Roundtrips().ConfigureAwait(false); - await Task.Delay(300).ConfigureAwait(false); - }).Wait(); - Assert.DoesNotContain(events, ev => ev.EventId == 0); // errors from the EventSource itself - VerifyEvents(events, "ConnectStart", 10); - VerifyEvents(events, "ConnectStop", 10); - - Dictionary eventCounters = events.Where(e => e.EventName == "EventCounters").Select(e => (IDictionary) e.Payload.Single()) - .GroupBy(d => (string)d["Name"], d => (double)d["Mean"], (k, v) => new { Name = k, Value = v.Sum() }) - .ToDictionary(p => p.Name, p => p.Value); - - VerifyEventCounter("incoming-connections-established", eventCounters); - VerifyEventCounter("outgoing-connections-established", eventCounters); - VerifyEventCounter("bytes-received", eventCounters); - VerifyEventCounter("bytes-sent", eventCounters); - VerifyEventCounter("datagrams-received", eventCounters); - VerifyEventCounter("datagrams-sent", eventCounters); + + await WaitForEventCountersAsync(events); + }); + Assert.DoesNotContain(events, ev => ev.Event.EventId == 0); // errors from the EventSource itself + + VerifyStartStopEvents(events, connect: true, expectedCount: 10); + + Assert.DoesNotContain(events, e => e.Event.EventName == "ConnectFailed"); + + VerifyEventCounters(events, connectCount: 10, shouldHaveTransferedBytes: true, shouldHaveDatagrams: true); } }).Dispose(); } - private static void VerifyEvents(IEnumerable events, string eventName, int expectedCount) + private static void VerifyStartStopEvents(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events, bool connect, int expectedCount) { - EventWrittenEventArgs[] starts = events.Where(e => e.EventName == eventName).ToArray(); + string startName = connect ? "ConnectStart" : "AcceptStart"; + (EventWrittenEventArgs Event, Guid ActivityId)[] starts = events.Where(e => e.Event.EventName == startName).ToArray(); Assert.Equal(expectedCount, starts.Length); + foreach ((EventWrittenEventArgs Event, _) in starts) + { + object startPayload = Assert.Single(Event.Payload); + Assert.False(string.IsNullOrWhiteSpace(startPayload as string)); + } + + string stopName = connect ? "ConnectStop" : "AcceptStop"; + (EventWrittenEventArgs Event, Guid ActivityId)[] stops = events.Where(e => e.Event.EventName == stopName).ToArray(); + Assert.Equal(expectedCount, stops.Length); + Assert.All(stops, stop => Assert.Empty(stop.Event.Payload)); + + for (int i = 0; i < expectedCount; i++) + { + Assert.NotEqual(Guid.Empty, starts[i].ActivityId); + Assert.Equal(starts[i].ActivityId, stops[i].ActivityId); + } + } + + private static async Task WaitForEventAsync(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events, string name) + { + DateTime startTime = DateTime.UtcNow; + while (!events.Any(e => e.Event.EventName == name)) + { + if (DateTime.UtcNow.Subtract(startTime) > TimeSpan.FromSeconds(30)) + throw new TimeoutException($"Timed out waiting for {name}"); + + await Task.Delay(100); + } + } + + private static async Task WaitForEventCountersAsync(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events) + { + DateTime startTime = DateTime.UtcNow; + int startCount = events.Count; + + while (events.Skip(startCount).Count(e => IsBytesSentEventCounter(e.Event)) < 2) + { + if (DateTime.UtcNow.Subtract(startTime) > TimeSpan.FromSeconds(30)) + throw new TimeoutException($"Timed out waiting for EventCounters"); + + await Task.Delay(100); + } + + static bool IsBytesSentEventCounter(EventWrittenEventArgs e) + { + if (e.EventName != "EventCounters") + return false; + + var dictionary = (IDictionary)e.Payload.Single(); + + return (string)dictionary["Name"] == "bytes-sent"; + } } - private static void VerifyEventCounter(string name, Dictionary eventCounters) + private static void VerifyEventCounters(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events, int connectCount, bool connectOnly = false, bool shouldHaveTransferedBytes = false, bool shouldHaveDatagrams = false) { - Assert.True(eventCounters.ContainsKey(name)); - Assert.True(eventCounters[name] > 0); + Dictionary eventCounters = events + .Where(e => e.Event.EventName == "EventCounters") + .Select(e => (IDictionary)e.Event.Payload.Single()) + .GroupBy(d => (string)d["Name"], d => (double)(d.ContainsKey("Mean") ? d["Mean"] : d["Increment"])) + .ToDictionary(p => p.Key, p => p.ToArray()); + + Assert.True(eventCounters.TryGetValue("outgoing-connections-established", out double[] outgoingConnections)); + Assert.Equal(connectCount, outgoingConnections[^1]); + + Assert.True(eventCounters.TryGetValue("incoming-connections-established", out double[] incomingConnections)); + Assert.Equal(connectOnly ? 0 : connectCount, incomingConnections[^1]); + + Assert.True(eventCounters.TryGetValue("bytes-received", out double[] bytesReceived)); + if (shouldHaveTransferedBytes) + { + Assert.True(bytesReceived[^1] > 0); + } + + Assert.True(eventCounters.TryGetValue("bytes-sent", out double[] bytesSent)); + if (shouldHaveTransferedBytes) + { + Assert.True(bytesSent[^1] > 0); + } + + Assert.True(eventCounters.TryGetValue("datagrams-received", out double[] datagramsReceived)); + if (shouldHaveDatagrams) + { + Assert.True(datagramsReceived[^1] > 0); + } + + Assert.True(eventCounters.TryGetValue("datagrams-sent", out double[] datagramsSent)); + if (shouldHaveDatagrams) + { + Assert.True(datagramsSent[^1] > 0); + } } } }