Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume DistributedContextPropagator APIs in DiagnosticsHandler #55392

Merged
merged 1 commit into from Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -527,6 +527,14 @@ private Activity CreateW3CActivity(string name, string state, IEnumerable<KeyVal
return list;
}

[Fact]
public void TestBuiltInPropagatorsAreCached()
{
Assert.Same(DistributedContextPropagator.CreateDefaultPropagator(), DistributedContextPropagator.CreateDefaultPropagator());
Assert.Same(DistributedContextPropagator.CreateNoOutputPropagator(), DistributedContextPropagator.CreateNoOutputPropagator());
Assert.Same(DistributedContextPropagator.CreatePassThroughPropagator(), DistributedContextPropagator.CreatePassThroughPropagator());
}

[ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))]
public void TestCustomPropagator()
{
Expand Down
2 changes: 2 additions & 0 deletions src/libraries/System.Net.Http/ref/System.Net.Http.cs
Expand Up @@ -394,6 +394,8 @@ public sealed partial class SocketsHttpHandler : System.Net.Http.HttpMessageHand
public bool EnableMultipleHttp2Connections { get { throw null; } set { } }
public Func<SocketsHttpConnectionContext, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask<System.IO.Stream>>? ConnectCallback { get { throw null; } set { } }
public Func<SocketsHttpPlaintextStreamFilterContext, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask<System.IO.Stream>>? PlaintextStreamFilter { get { throw null; } set { } }
[System.CLSCompliantAttribute(false)]
public System.Diagnostics.DistributedContextPropagator? ActivityHeadersPropagator { get { throw null; } set { } }
}
public sealed class SocketsHttpConnectionContext
{
Expand Down
1 change: 1 addition & 0 deletions src/libraries/System.Net.Http/ref/System.Net.Http.csproj
Expand Up @@ -14,5 +14,6 @@
<ProjectReference Include="..\..\System.Net.Security\ref\System.Net.Security.csproj" />
<ProjectReference Include="..\..\System.Security.Cryptography.X509Certificates\ref\System.Security.Cryptography.X509Certificates.csproj" />
<ProjectReference Include="..\..\System.Text.Encoding\ref\System.Text.Encoding.csproj" />
<ProjectReference Include="..\..\System.Diagnostics.DiagnosticSource\ref\System.Diagnostics.DiagnosticSource.csproj" />
</ItemGroup>
</Project>
Expand Up @@ -3,13 +3,12 @@

using System.Collections.Generic;
using System.IO;
using System.Net.Quic;
using System.Net.Quic.Implementations;
using System.Net.Security;
using System.Runtime.Versioning;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics;

namespace System.Net.Http
{
Expand Down Expand Up @@ -173,6 +172,13 @@ public HttpKeepAlivePingPolicy KeepAlivePingPolicy
set => throw new PlatformNotSupportedException();
}

[CLSCompliant(false)]
tarekgh marked this conversation as resolved.
Show resolved Hide resolved
public DistributedContextPropagator? ActivityHeadersPropagator
{
get => throw new PlatformNotSupportedException();
set => throw new PlatformNotSupportedException();
}

protected internal override Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request, CancellationToken cancellationToken) => throw new PlatformNotSupportedException();

Expand Down
Expand Up @@ -13,36 +13,59 @@ namespace System.Net.Http
/// <summary>
/// DiagnosticHandler notifies DiagnosticSource subscribers about outgoing Http requests
/// </summary>
internal sealed class DiagnosticsHandler : DelegatingHandler
internal sealed class DiagnosticsHandler : HttpMessageHandlerStage
{
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(DiagnosticsHandlerLoggingStrings.DiagnosticListenerName);

/// <summary>
/// DiagnosticHandler constructor
/// </summary>
/// <param name="innerHandler">Inner handler: Windows or Unix implementation of HttpMessageHandler.
/// Note that DiagnosticHandler is the latest in the pipeline </param>
public DiagnosticsHandler(HttpMessageHandler innerHandler) : base(innerHandler)
private readonly HttpMessageHandler _innerHandler;
private readonly DistributedContextPropagator _propagator;
private readonly HeaderDescriptor[]? _propagatorFields;

public DiagnosticsHandler(HttpMessageHandler innerHandler, DistributedContextPropagator propagator, bool autoRedirect = false)
{
Debug.Assert(IsGloballyEnabled());
Debug.Assert(innerHandler is not null && propagator is not null);

_innerHandler = innerHandler;
_propagator = propagator;

// Prepare HeaderDescriptors for fields we need to clear when following redirects
if (autoRedirect && _propagator.Fields is IReadOnlyCollection<string> fields && fields.Count > 0)
{
var fieldDescriptors = new List<HeaderDescriptor>(fields.Count);
foreach (string field in fields)
{
if (field is not null && HeaderDescriptor.TryGet(field, out HeaderDescriptor descriptor))
{
fieldDescriptors.Add(descriptor);
}
}
_propagatorFields = fieldDescriptors.ToArray();
}
}

internal static bool IsEnabled()
private static bool IsEnabled()
MihaZupan marked this conversation as resolved.
Show resolved Hide resolved
{
// check if there is a parent Activity (and propagation is not suppressed)
// or if someone listens to HttpHandlerDiagnosticListener
return IsGloballyEnabled() && (Activity.Current != null || s_diagnosticListener.IsEnabled());
// check if there is a parent Activity or if someone listens to HttpHandlerDiagnosticListener
return Activity.Current != null || s_diagnosticListener.IsEnabled();
}

internal static bool IsGloballyEnabled() => GlobalHttpSettings.DiagnosticsHandler.EnableActivityPropagation;

// SendAsyncCore returns already completed ValueTask for when async: false is passed.
// Internally, it calls the synchronous Send method of the base class.
protected internal override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) =>
SendAsyncCore(request, async: false, cancellationToken).AsTask().GetAwaiter().GetResult();

protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) =>
SendAsyncCore(request, async: true, cancellationToken).AsTask();
internal override ValueTask<HttpResponseMessage> SendAsync(HttpRequestMessage request, bool async, CancellationToken cancellationToken)
{
if (IsEnabled())
{
return SendAsyncCore(request, async, cancellationToken);
}
else
{
return async ?
new ValueTask<HttpResponseMessage>(_innerHandler.SendAsync(request, cancellationToken)) :
new ValueTask<HttpResponseMessage>(_innerHandler.Send(request, cancellationToken));
}
}

private async ValueTask<HttpResponseMessage> SendAsyncCore(HttpRequestMessage request, bool async,
CancellationToken cancellationToken)
Expand All @@ -58,6 +81,16 @@ internal static bool IsEnabled()
throw new ArgumentNullException(nameof(request), SR.net_http_handler_norequest);
}

// Since we are reusing the request message instance on redirects, clear any existing headers
// Do so before writing DiagnosticListener events as instrumentations use those to inject headers
if (request.WasRedirected() && _propagatorFields is HeaderDescriptor[] fields)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I recall OpenTelemetry defines Fields as a hint about which fields would be inspected but not a guarantee. In particular they left the door open that some protocol would have field names dynamically determined so it would be impossible to provide an exhaustive list. Of course in practice I am not aware of any propagator implementation that does this.

If we want to do this we should probably clarify in our API comments that our definition of Fields is strict and we don't support the dynamic shenanigans that OT left the door open to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to do this we should probably clarify in our API comments that our definition of Fields is strict and we don't support the dynamic shenanigans that OT left the door open to.

That would be my preference.

I can't think of a different approach that wouldn't break some scenarios. For example doing the check inside the setter callback and removing the header there instead is too late since instrumentations like AI/Otel may try to add headers before us - we really have to clear the headers in advance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tarekgh - making sure you see this : )

{
foreach (HeaderDescriptor field in fields)
{
request.Headers.Remove(field);
MihaZupan marked this conversation as resolved.
Show resolved Hide resolved
}
}

Activity? activity = null;
DiagnosticListener diagnosticListener = s_diagnosticListener;

Expand All @@ -72,8 +105,8 @@ internal static bool IsEnabled()
try
{
return async ?
await base.SendAsync(request, cancellationToken).ConfigureAwait(false) :
base.Send(request, cancellationToken);
await _innerHandler.SendAsync(request, cancellationToken).ConfigureAwait(false) :
_innerHandler.Send(request, cancellationToken);
}
finally
{
Expand Down Expand Up @@ -119,8 +152,8 @@ internal static bool IsEnabled()
try
{
response = async ?
await base.SendAsync(request, cancellationToken).ConfigureAwait(false) :
base.Send(request, cancellationToken);
await _innerHandler.SendAsync(request, cancellationToken).ConfigureAwait(false) :
_innerHandler.Send(request, cancellationToken);
return response;
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -170,6 +203,16 @@ internal static bool IsEnabled()
}
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
_innerHandler.Dispose();
}

base.Dispose(disposing);
}

#region private

private sealed class ActivityStartData
Expand Down Expand Up @@ -269,42 +312,18 @@ internal ResponseData(HttpResponseMessage? response, Guid loggingRequestId, long
public override string ToString() => $"{{ {nameof(Response)} = {Response}, {nameof(LoggingRequestId)} = {LoggingRequestId}, {nameof(Timestamp)} = {Timestamp}, {nameof(RequestTaskStatus)} = {RequestTaskStatus} }}";
}

private static void InjectHeaders(Activity currentActivity, HttpRequestMessage request)
private void InjectHeaders(Activity currentActivity, HttpRequestMessage request)
{
if (currentActivity.IdFormat == ActivityIdFormat.W3C)
_propagator.Inject(currentActivity, request, static (carrier, key, value) =>
{
if (!request.Headers.Contains(DiagnosticsHandlerLoggingStrings.TraceParentHeaderName))
if (carrier is HttpRequestMessage request &&
key is not null &&
HeaderDescriptor.TryGet(key, out HeaderDescriptor descriptor) &&
!request.Headers.TryGetHeaderValue(descriptor, out _))
{
request.Headers.TryAddWithoutValidation(DiagnosticsHandlerLoggingStrings.TraceParentHeaderName, currentActivity.Id);
if (currentActivity.TraceStateString != null)
{
request.Headers.TryAddWithoutValidation(DiagnosticsHandlerLoggingStrings.TraceStateHeaderName, currentActivity.TraceStateString);
}
request.Headers.TryAddWithoutValidation(descriptor, value);
}
}
else
{
if (!request.Headers.Contains(DiagnosticsHandlerLoggingStrings.RequestIdHeaderName))
{
request.Headers.TryAddWithoutValidation(DiagnosticsHandlerLoggingStrings.RequestIdHeaderName, currentActivity.Id);
}
}

// we expect baggage to be empty or contain a few items
using (IEnumerator<KeyValuePair<string, string?>> e = currentActivity.Baggage.GetEnumerator())
{
if (e.MoveNext())
{
var baggage = new List<string>();
do
{
KeyValuePair<string, string?> item = e.Current;
baggage.Add(new NameValueHeaderValue(WebUtility.UrlEncode(item.Key), WebUtility.UrlEncode(item.Value)).ToString());
}
while (e.MoveNext());
request.Headers.TryAddWithoutValidation(DiagnosticsHandlerLoggingStrings.CorrelationContextHeaderName, baggage);
}
}
});
}

[UnconditionalSuppressMessage("ReflectionAnalysis", "IL2026:UnrecognizedReflectionPattern",
Expand Down
Expand Up @@ -15,11 +15,5 @@ internal static class DiagnosticsHandlerLoggingStrings
public const string ExceptionEventName = "System.Net.Http.Exception";
public const string ActivityName = "System.Net.Http.HttpRequestOut";
public const string ActivityStartName = "System.Net.Http.HttpRequestOut.Start";

public const string RequestIdHeaderName = "Request-Id";
public const string CorrelationContextHeaderName = "Correlation-Context";

public const string TraceParentHeaderName = "traceparent";
public const string TraceStateHeaderName = "tracestate";
}
}
Expand Up @@ -9,6 +9,7 @@
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
using System.Diagnostics;
using System.Diagnostics;

#if TARGET_BROWSER
using HttpHandlerType = System.Net.Http.BrowserHttpHandler;
#else
Expand All @@ -20,18 +21,30 @@ namespace System.Net.Http
public partial class HttpClientHandler : HttpMessageHandler
{
private readonly HttpHandlerType _underlyingHandler;
private readonly DiagnosticsHandler? _diagnosticsHandler;

private HttpMessageHandler Handler
#if TARGET_BROWSER
{ get; }
#else
=> _underlyingHandler;
#endif

private ClientCertificateOption _clientCertificateOptions;

private volatile bool _disposed;

public HttpClientHandler()
{
_underlyingHandler = new HttpHandlerType();

#if TARGET_BROWSER
Handler = _underlyingHandler;
if (DiagnosticsHandler.IsGloballyEnabled())
{
_diagnosticsHandler = new DiagnosticsHandler(_underlyingHandler);
Handler = new DiagnosticsHandler(Handler, DistributedContextPropagator.Current);
}
#endif

ClientCertificateOptions = ClientCertificateOption.Manual;
}

Expand Down Expand Up @@ -288,21 +301,11 @@ public SslProtocols SslProtocols
public IDictionary<string, object?> Properties => _underlyingHandler.Properties;

[UnsupportedOSPlatform("browser")]
protected internal override HttpResponseMessage Send(HttpRequestMessage request,
CancellationToken cancellationToken)
{
return DiagnosticsHandler.IsEnabled() && _diagnosticsHandler != null ?
_diagnosticsHandler.Send(request, cancellationToken) :
_underlyingHandler.Send(request, cancellationToken);
}
protected internal override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) =>
Handler.Send(request, cancellationToken);

protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
CancellationToken cancellationToken)
{
return DiagnosticsHandler.IsEnabled() && _diagnosticsHandler != null ?
_diagnosticsHandler.SendAsync(request, cancellationToken) :
_underlyingHandler.SendAsync(request, cancellationToken);
}
protected internal override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) =>
Handler.SendAsync(request, cancellationToken);

// lazy-load the validator func so it can be trimmed by the ILLinker if it isn't used.
private static Func<HttpRequestMessage, X509Certificate2?, X509Chain?, SslPolicyErrors, bool>? s_dangerousAcceptAnyServerCertificateValidator;
Expand Down
Expand Up @@ -15,6 +15,7 @@ public class HttpRequestMessage : IDisposable

private const int MessageNotYetSent = 0;
private const int MessageAlreadySent = 1;
private const int MessageIsRedirect = 2;

// Track whether the message has been sent.
// The message shouldn't be sent again if this field is equal to MessageAlreadySent.
Expand Down Expand Up @@ -159,12 +160,13 @@ public override string ToString()
return sb.ToString();
}

internal bool MarkAsSent()
{
return Interlocked.Exchange(ref _sendStatus, MessageAlreadySent) == MessageNotYetSent;
}
internal bool MarkAsSent() => Interlocked.CompareExchange(ref _sendStatus, MessageAlreadySent, MessageNotYetSent) == MessageNotYetSent;

internal bool WasSentByHttpClient() => (_sendStatus & MessageAlreadySent) != 0;

internal void MarkAsRedirected() => _sendStatus |= MessageIsRedirect;

internal bool WasSentByHttpClient() => _sendStatus == MessageAlreadySent;
internal bool WasRedirected() => (_sendStatus & MessageIsRedirect) != 0;

#region IDisposable Members

Expand Down
Expand Up @@ -8,6 +8,7 @@
using System.Runtime.Versioning;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;

namespace System.Net.Http
{
Expand Down Expand Up @@ -47,6 +48,8 @@ internal sealed class HttpConnectionSettings
internal HeaderEncodingSelector<HttpRequestMessage>? _requestHeaderEncodingSelector;
internal HeaderEncodingSelector<HttpRequestMessage>? _responseHeaderEncodingSelector;

internal DistributedContextPropagator? _activityHeadersPropagator = DistributedContextPropagator.Current;

internal Version _maxHttpVersion;

internal SslClientAuthenticationOptions? _sslOptions;
Expand Down Expand Up @@ -119,6 +122,7 @@ public HttpConnectionSettings CloneAndNormalize()
_connectCallback = _connectCallback,
_plaintextStreamFilter = _plaintextStreamFilter,
_initialHttp2StreamWindowSize = _initialHttp2StreamWindowSize,
_activityHeadersPropagator = _activityHeadersPropagator,
};

// TODO: Remove if/when QuicImplementationProvider is removed from System.Net.Quic.
Expand Down
Expand Up @@ -75,6 +75,8 @@ internal override async ValueTask<HttpResponseMessage> SendAsync(HttpRequestMess
}
}

request.MarkAsRedirected();

// Issue the redirected request.
response = await _redirectInnerHandler.SendAsync(request, async, cancellationToken).ConfigureAwait(false);
}
Expand Down