/
Connection.cs
146 lines (128 loc) · 4.77 KB
/
Connection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using SignalR.Infrastructure;
using SignalR.MessageBus;
namespace SignalR
{
public class Connection : IConnection, IReceivingConnection
{
private readonly IMessageBus _messageBus;
private readonly IJsonSerializer _serializer;
private readonly string _baseSignal;
private readonly string _connectionId;
private readonly HashSet<string> _signals;
private readonly HashSet<string> _groups;
private readonly ITraceManager _trace;
private bool _disconnected;
public Connection(IMessageBus messageBus,
IJsonSerializer jsonSerializer,
string baseSignal,
string connectionId,
IEnumerable<string> signals,
IEnumerable<string> groups,
ITraceManager traceManager)
{
_messageBus = messageBus;
_serializer = jsonSerializer;
_baseSignal = baseSignal;
_connectionId = connectionId;
_signals = new HashSet<string>(signals);
_groups = new HashSet<string>(groups);
_trace = traceManager;
}
private IEnumerable<string> Signals
{
get
{
return _signals.Concat(_groups);
}
}
public virtual Task Broadcast(object value)
{
return Broadcast(_baseSignal, value);
}
public virtual Task Broadcast(string key, object value)
{
return SendMessage(key, value);
}
public Task Send(object value)
{
return SendMessage(_connectionId, value);
}
public Task<PersistentResponse> ReceiveAsync(CancellationToken timeoutToken)
{
return _messageBus.GetMessages(Signals, null, timeoutToken)
.Then(result => GetResponse(result));
}
public Task<PersistentResponse> ReceiveAsync(string messageId, CancellationToken timeoutToken)
{
return _messageBus.GetMessages(Signals, messageId, timeoutToken)
.Then(result => GetResponse(result));
}
public Task SendCommand(SignalCommand command)
{
return SendMessage(SignalCommand.AddCommandSuffix(_connectionId), command);
}
private PersistentResponse GetResponse(MessageResult result)
{
// Do a single sweep through the results to process commands and extract values
var messageValues = ProcessResults(result.Messages);
var response = new PersistentResponse
{
MessageId = result.LastMessageId,
Messages = messageValues,
Disconnect = _disconnected,
TimedOut = result.TimedOut
};
PopulateResponseState(response);
_trace.Source.TraceInformation("Connection: Connection {0} received {1} messages, last id {2}", _connectionId, result.Messages.Count, result.LastMessageId);
return response;
}
private List<object> ProcessResults(IList<Message> source)
{
var messageValues = new List<object>();
foreach (var message in source)
{
if (SignalCommand.IsCommand(message))
{
var command = WrappedValue.Unwrap<SignalCommand>(message.Value, _serializer);
ProcessCommand(command);
}
else
{
messageValues.Add(WrappedValue.Unwrap(message.Value, _serializer));
}
}
return messageValues;
}
private void ProcessCommand(SignalCommand command)
{
switch (command.Type)
{
case CommandType.AddToGroup:
_groups.Add((string)command.Value);
break;
case CommandType.RemoveFromGroup:
_groups.Remove((string)command.Value);
break;
case CommandType.Disconnect:
_disconnected = true;
break;
}
}
private Task SendMessage(string key, object value)
{
return _messageBus.Send(_connectionId, key, new WrappedValue(value, _serializer)).Catch();
}
private void PopulateResponseState(PersistentResponse response)
{
// Set the groups on the outgoing transport data
if (_groups.Any())
{
response.TransportData["Groups"] = _groups;
}
}
}
}