Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 133 additions & 21 deletions src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public partial class HubConnection : IAsyncDisposable
// Default amount of bytes we'll buffer when using Stateful Reconnect until applying backpressure to sends from the client.
internal const long DefaultStatefulReconnectBufferSize = 100_000;

internal const string ActivityName = "Microsoft.AspNetCore.SignalR.Client.InvocationOut";

// The receive loop has a single reader and single writer at a time so optimize the channel for that
private static readonly UnboundedChannelOptions _receiveLoopOptions = new UnboundedChannelOptions
{
Expand All @@ -73,11 +75,13 @@ public partial class HubConnection : IAsyncDisposable
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly ConnectionLogScope _logScope;
private readonly ActivitySource _activitySource;
private readonly IHubProtocol _protocol;
private readonly IServiceProvider _serviceProvider;
private readonly IConnectionFactory _connectionFactory;
private readonly IRetryPolicy? _reconnectPolicy;
private readonly EndPoint _endPoint;
private readonly string? _serviceName;
private readonly ConcurrentDictionary<string, InvocationHandlerList> _handlers = new ConcurrentDictionary<string, InvocationHandlerList>(StringComparer.Ordinal);

// Holds all mutable state other than user-defined handlers and settable properties.
Expand Down Expand Up @@ -235,6 +239,10 @@ public HubConnection(IConnectionFactory connectionFactory,

_logScope = new ConnectionLogScope();

// ActivitySource can be resolved from the service provider when unit testing.
_activitySource = (serviceProvider.GetService<SignalRClientActivitySource>() ?? SignalRClientActivitySource.Instance).ActivitySource;
_serviceName = (_endPoint is UriEndPoint e) ? e.Uri.AbsolutePath.Trim('/') : null;

var options = serviceProvider.GetService<IOptions<HubConnectionOptions>>();

ServerTimeout = options?.Value.ServerTimeout ?? DefaultServerTimeout;
Expand Down Expand Up @@ -720,7 +728,8 @@ async Task OnStreamCanceled(InvocationRequest irq)
var readers = default(Dictionary<string, object>);

CheckDisposed();
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(StreamAsChannelCoreAsync), token: cancellationToken).ConfigureAwait(false);

var (connectionState, activity) = await WaitForActiveConnectionWithActivityAsync(nameof(StreamAsChannelCoreAsync), methodName, token: cancellationToken).ConfigureAwait(false);

ChannelReader<object?> channel;
try
Expand All @@ -731,7 +740,7 @@ async Task OnStreamCanceled(InvocationRequest irq)
readers = PackageStreamingParams(connectionState, ref args, out var streamIds);

// I just want an excuse to use 'irq' as a variable name...
var irq = InvocationRequest.Stream(cancellationToken, returnType, connectionState.GetNextId(), _loggerFactory, this, out channel);
var irq = InvocationRequest.Stream(cancellationToken, returnType, connectionState.GetNextId(), _loggerFactory, this, activity, out channel);
await InvokeStreamCore(connectionState, methodName, irq, args, streamIds?.ToArray(), cancellationToken).ConfigureAwait(false);

if (cancellationToken.CanBeCanceled)
Expand Down Expand Up @@ -1003,12 +1012,71 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea
}
}

private async Task<(ConnectionState, Activity?)> WaitForActiveConnectionWithActivityAsync(string sendingMethodName, string invokedMethodName, CancellationToken token)
{
// Start the activity before waiting on the connection.
// Starting the activity here means time to connect or reconnect is included in the invoke.
var activity = CreateActivity(invokedMethodName);

try
{
ConnectionState connectionState;
var connectionStateTask = _state.WaitForActiveConnectionAsync(sendingMethodName, token);
if (connectionStateTask.Status == TaskStatus.RanToCompletion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to set any server tags if connectionStateTask throws. It throws when you're either not connected or the user canceled the passed in CTS. In either case it's not actually doing anything except creating and stopping an activity.

Plus, it'd clean up the code a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a problem starting the connection then it would be useful to know the server address being called

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the connection failing to start, or closing before the send calls, or not even calling start in the first place, is not reflected in SendAsync or InvokeAsync calls. All it knows is that there is no connection.

Copy link
Member Author

@JamesNK JamesNK Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With telemetry it's useful to view it from the perspective of the user. The mechanics of what happens internally that could cause the call to fail are invisible to them. They're focused on the end result: the call they want to make failed and there was an error. Having information about what they were trying to do - make a call to a hub with the specified rpc.service, rpc.method, server.address, server.port - is what they care about. Then the error reason would be in error.type.

For an example of prior art, HttpClient always reports server.address and server.port when making a HTTP request. It does this even if the server connection couldn't be established, or the request was canceled before sending any data.

If you want to make it clearer that the call failed because of the connection not being available, there is room to do that by providing good values to error.type. By default, it is the exception type name, but it can be customized in known scenarios. For example, if there is a problem with the connection, error.type could be something like: negotiate-failed, hub-connection-not-started, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, server.address and server.port are considered required attributes according to the spec: https://github.com/open-telemetry/semantic-conventions/blob/1e34b57b9f73b08b109cdc0e8841e857e5f5c205/docs/rpc/rpc-spans.md#client-attributes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks, I wanted to know some concrete reason we were including it.

{
// Attempt to get already connected connection and set server tags using it.
connectionState = connectionStateTask.Result;
SetServerTags(activity, connectionState.ConnectionUrl);
activity?.Start();
}
else
{
// Fallback to using configured endpoint.
var initialUri = (_endPoint as UriEndPoint)?.Uri;
SetServerTags(activity, initialUri);
activity?.Start();

connectionState = await connectionStateTask.ConfigureAwait(false);

// After connection is returned, check if URL is different. If so, update activity server tags.
if (connectionState.ConnectionUrl != null && connectionState.ConnectionUrl != initialUri)
{
SetServerTags(activity, connectionState.ConnectionUrl);
}
}

return (connectionState, activity);
}
catch (Exception ex)
{
// If there is an error getting an active connection then the invocation has failed.
if (activity is not null)
{
activity.SetStatus(ActivityStatusCode.Error);
activity.SetTag("error.type", ex.GetType().FullName);
activity.Stop();
}

throw;
}

static void SetServerTags(Activity? activity, Uri? uri)
{
if (activity != null && uri != null)
{
activity.SetTag("server.address", uri.Host);
activity.SetTag("server.port", uri.Port);
}
}
}

private async Task<object?> InvokeCoreAsyncCore(string methodName, Type returnType, object?[] args, CancellationToken cancellationToken)
{
var readers = default(Dictionary<string, object>);

CheckDisposed();
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(InvokeCoreAsync), token: cancellationToken).ConfigureAwait(false);

var (connectionState, activity) = await WaitForActiveConnectionWithActivityAsync(nameof(InvokeCoreAsync), methodName, token: cancellationToken).ConfigureAwait(false);

Task<object?> invocationTask;
try
Expand All @@ -1017,7 +1085,7 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea

readers = PackageStreamingParams(connectionState, ref args, out var streamIds);

var irq = InvocationRequest.Invoke(cancellationToken, returnType, connectionState.GetNextId(), _loggerFactory, this, out invocationTask);
var irq = InvocationRequest.Invoke(cancellationToken, returnType, connectionState.GetNextId(), _loggerFactory, this, activity, out invocationTask);
await InvokeCore(connectionState, methodName, irq, args, streamIds?.ToArray(), cancellationToken).ConfigureAwait(false);

LaunchStreams(connectionState, readers, cancellationToken);
Expand All @@ -1031,13 +1099,43 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea
return await invocationTask.ConfigureAwait(false);
}

private Activity? CreateActivity(string methodName)
{
var activity = _activitySource.CreateActivity(ActivityName, ActivityKind.Client);
if (activity is null && Activity.Current is not null && _logger.IsEnabled(LogLevel.Critical))
{
activity = new Activity(ActivityName);
}

if (activity is not null)
{
if (!string.IsNullOrEmpty(_serviceName))
{
activity.DisplayName = $"{_serviceName}/{methodName}";
activity.SetTag("rpc.service", _serviceName);
}
else
{
activity.DisplayName = methodName;
}

activity.SetTag("rpc.system", "signalr");
activity.SetTag("rpc.method", methodName);
}

return activity;
}

private async Task InvokeCore(ConnectionState connectionState, string methodName, InvocationRequest irq, object?[] args, string[]? streams, CancellationToken cancellationToken)
{
Log.PreparingBlockingInvocation(_logger, irq.InvocationId, methodName, irq.ResultType.FullName!, args.Length);

// Client invocations are always blocking
var invocationMessage = new InvocationMessage(irq.InvocationId, methodName, args, streams);
InjectHeaders(invocationMessage);
if (irq.Activity is not null)
{
InjectHeaders(irq.Activity, invocationMessage);
}

Log.RegisteringInvocation(_logger, irq.InvocationId);
connectionState.AddInvocation(irq);
Expand All @@ -1064,7 +1162,10 @@ private async Task InvokeStreamCore(ConnectionState connectionState, string meth
Log.PreparingStreamingInvocation(_logger, irq.InvocationId, methodName, irq.ResultType.FullName!, args.Length);

var invocationMessage = new StreamInvocationMessage(irq.InvocationId, methodName, args, streams);
InjectHeaders(invocationMessage);
if (irq.Activity is not null)
{
InjectHeaders(irq.Activity, invocationMessage);
}

Log.RegisteringInvocation(_logger, irq.InvocationId);

Expand All @@ -1085,23 +1186,16 @@ private async Task InvokeStreamCore(ConnectionState connectionState, string meth
}
}

private static void InjectHeaders(HubInvocationMessage invocationMessage)
private static void InjectHeaders(Activity currentActivity, HubInvocationMessage invocationMessage)
{
// TODO: Change when SignalR client has an activity.
// This sends info about the current activity, regardless of the activity source, to the SignalR server.
// When SignalR client supports client activities this logic should be updated to only send headers
// if the SignalR client activity is created. The goal is to match the behavior of distributed tracing in HttpClient.
if (Activity.Current is { } currentActivity)
DistributedContextPropagator.Current.Inject(currentActivity, invocationMessage, static (carrier, key, value) =>
{
DistributedContextPropagator.Current.Inject(currentActivity, invocationMessage, static (carrier, key, value) =>
if (carrier is HubInvocationMessage invocationMessage)
{
if (carrier is HubInvocationMessage invocationMessage)
{
invocationMessage.Headers ??= new Dictionary<string, string>();
invocationMessage.Headers[key] = value;
}
});
}
invocationMessage.Headers ??= new Dictionary<string, string>();
invocationMessage.Headers[key] = value;
}
});
}

private async Task SendHubMessage(ConnectionState connectionState, HubMessage hubMessage, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -1131,7 +1225,8 @@ private async Task SendCoreAsyncCore(string methodName, object?[] args, Cancella
var readers = default(Dictionary<string, object>);

CheckDisposed();
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(SendCoreAsync), token: cancellationToken).ConfigureAwait(false);

var (connectionState, activity) = await WaitForActiveConnectionWithActivityAsync(nameof(SendCoreAsync), methodName, token: cancellationToken).ConfigureAwait(false);
try
{
CheckDisposed();
Expand All @@ -1140,12 +1235,27 @@ private async Task SendCoreAsyncCore(string methodName, object?[] args, Cancella

Log.PreparingNonBlockingInvocation(_logger, methodName, args.Length);
var invocationMessage = new InvocationMessage(null, methodName, args, streamIds?.ToArray());
if (activity is not null)
{
InjectHeaders(activity, invocationMessage);
}
await SendHubMessage(connectionState, invocationMessage, cancellationToken).ConfigureAwait(false);

LaunchStreams(connectionState, readers, cancellationToken);
}
catch (Exception ex)
{
if (activity is not null)
{
activity.SetStatus(ActivityStatusCode.Error);
activity.SetTag("error.type", ex.GetType().FullName);
activity.Stop();
}
throw;
}
finally
{
activity?.Stop();
_state.ReleaseConnectionLock();
}
}
Expand Down Expand Up @@ -2018,6 +2128,7 @@ private sealed class ConnectionState : IInvocationBinder
private long _nextActivationSendPing;

public ConnectionContext Connection { get; }
public Uri? ConnectionUrl { get; }
public Task? ReceiveTask { get; set; }
public Exception? CloseException { get; set; }
public CancellationToken UploadStreamToken { get; set; }
Expand All @@ -2036,6 +2147,7 @@ public bool Stopping
public ConnectionState(ConnectionContext connection, HubConnection hubConnection)
{
Connection = connection;
ConnectionUrl = (connection.RemoteEndPoint is UriEndPoint ep) ? ep.Uri : null;

_hubConnection = hubConnection;
_hubConnection._logScope.ConnectionId = connection.ConnectionId;
Expand Down
Loading