-
Notifications
You must be signed in to change notification settings - Fork 9.9k
/
HubCallerClients.cs
149 lines (123 loc) · 5.22 KB
/
HubCallerClients.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
namespace Microsoft.AspNetCore.SignalR.Internal;
internal sealed class HubCallerClients : IHubCallerClients
{
private readonly string _connectionId;
private readonly IHubClients _hubClients;
internal readonly ChannelBasedSemaphore _parallelInvokes;
private int _shouldReleaseSemaphore = 1;
// Client results don't work in OnConnectedAsync
// This property is set by the hub dispatcher when those methods are being called
// so we can prevent users from making blocking client calls by returning a custom ISingleClientProxy instance
internal bool InvokeAllowed { get; set; }
public HubCallerClients(IHubClients hubClients, string connectionId, ChannelBasedSemaphore parallelInvokes)
{
_connectionId = connectionId;
_hubClients = hubClients;
_parallelInvokes = parallelInvokes;
}
IClientProxy IHubCallerClients<IClientProxy>.Caller => Caller;
public ISingleClientProxy Caller
{
get
{
if (!InvokeAllowed)
{
return new NoInvokeSingleClientProxy(_hubClients.Client(_connectionId));
}
return new SingleClientProxy(_hubClients.Client(_connectionId), this);
}
}
public IClientProxy Others => _hubClients.AllExcept(new[] { _connectionId });
public IClientProxy All => _hubClients.All;
public IClientProxy AllExcept(IReadOnlyList<string> excludedConnectionIds)
{
return _hubClients.AllExcept(excludedConnectionIds);
}
IClientProxy IHubClients<IClientProxy>.Client(string connectionId) => Client(connectionId);
public ISingleClientProxy Client(string connectionId)
{
if (!InvokeAllowed)
{
return new NoInvokeSingleClientProxy(_hubClients.Client(connectionId));
}
return new SingleClientProxy(_hubClients.Client(connectionId), this);
}
public IClientProxy Group(string groupName)
{
return _hubClients.Group(groupName);
}
public IClientProxy Groups(IReadOnlyList<string> groupNames)
{
return _hubClients.Groups(groupNames);
}
public IClientProxy OthersInGroup(string groupName)
{
return _hubClients.GroupExcept(groupName, new[] { _connectionId });
}
public IClientProxy GroupExcept(string groupName, IReadOnlyList<string> excludedConnectionIds)
{
return _hubClients.GroupExcept(groupName, excludedConnectionIds);
}
public IClientProxy User(string userId)
{
return _hubClients.User(userId);
}
public IClientProxy Clients(IReadOnlyList<string> connectionIds)
{
return _hubClients.Clients(connectionIds);
}
public IClientProxy Users(IReadOnlyList<string> userIds)
{
return _hubClients.Users(userIds);
}
// false if semaphore is being released by another caller, true if you own releasing the semaphore
internal bool TrySetSemaphoreReleased()
{
return Interlocked.CompareExchange(ref _shouldReleaseSemaphore, 0, 1) == 1;
}
private sealed class NoInvokeSingleClientProxy : ISingleClientProxy
{
private readonly ISingleClientProxy _proxy;
public NoInvokeSingleClientProxy(ISingleClientProxy hubClients)
{
_proxy = hubClients;
}
public Task<T> InvokeCoreAsync<T>(string method, object?[] args, CancellationToken cancellationToken = default)
{
throw new InvalidOperationException("Client results inside OnConnectedAsync Hub methods are not allowed.");
}
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default)
{
return _proxy.SendCoreAsync(method, args, cancellationToken);
}
}
private sealed class SingleClientProxy : ISingleClientProxy
{
private readonly ISingleClientProxy _proxy;
private readonly HubCallerClients _hubCallerClients;
public SingleClientProxy(ISingleClientProxy hubClients, HubCallerClients hubCallerClients)
{
_proxy = hubClients;
_hubCallerClients = hubCallerClients;
}
public async Task<T> InvokeCoreAsync<T>(string method, object?[] args, CancellationToken cancellationToken = default)
{
// Releases the Channel that is blocking pending invokes, which in turn can block the receive loop.
// Because we are waiting for a result from the client we need to let the receive loop run otherwise we'll be blocked forever
var value = _hubCallerClients.TrySetSemaphoreReleased();
// Only release once, and we set ShouldReleaseSemaphore to 0 so the DefaultHubDispatcher knows not to call Release again
if (value)
{
_hubCallerClients._parallelInvokes.Release();
}
var result = await _proxy.InvokeCoreAsync<T>(method, args, cancellationToken);
return result;
}
public Task SendCoreAsync(string method, object?[] args, CancellationToken cancellationToken = default)
{
return _proxy.SendCoreAsync(method, args, cancellationToken);
}
}
}