-
Notifications
You must be signed in to change notification settings - Fork 2k
/
ObserverManager.cs
317 lines (291 loc) · 10.8 KB
/
ObserverManager.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using System.Collections;
using System.Collections.Generic;
using Orleans.Runtime;
using System.Linq;
namespace Orleans.Utilities
{
/// <summary>
/// Maintains a collection of observers.
/// </summary>
/// <typeparam name="TObserver">
/// The observer type.
/// </typeparam>
public class ObserverManager<TObserver> : ObserverManager<IAddressable, TObserver>
{
/// <summary>
/// Initializes a new instance of the <see cref="ObserverManager{TObserver}"/> class.
/// </summary>
/// <param name="expiration">
/// The expiration.
/// </param>
/// <param name="log">The log.</param>
public ObserverManager(TimeSpan expiration, ILogger log) : base(expiration, log)
{
}
}
/// <summary>
/// Maintains a collection of observers.
/// </summary>
/// <typeparam name="TIdentity">
/// The address type, used to identify observers.
/// </typeparam>
/// <typeparam name="TObserver">
/// The observer type.
/// </typeparam>
public class ObserverManager<TIdentity, TObserver> : IEnumerable<TObserver>
{
/// <summary>
/// The observers.
/// </summary>
private readonly Dictionary<TIdentity, ObserverEntry> _observers = new();
/// <summary>
/// The log.
/// </summary>
private readonly ILogger _log;
/// <summary>
/// Initializes a new instance of the <see cref="ObserverManager{TIdentity,TObserver}"/> class.
/// </summary>
/// <param name="expiration">
/// The expiration.
/// </param>
/// <param name="log">The log.</param>
public ObserverManager(TimeSpan expiration, ILogger log)
{
ExpirationDuration = expiration;
_log = log;
GetDateTime = () => DateTime.UtcNow;
}
/// <summary>
/// Gets or sets the delegate used to get the date and time, for expiry.
/// </summary>
public Func<DateTime> GetDateTime { get; set; }
/// <summary>
/// Gets or sets the expiration time span, after which observers are lazily removed.
/// </summary>
public TimeSpan ExpirationDuration { get; set; }
/// <summary>
/// Gets the number of observers.
/// </summary>
public int Count => _observers.Count;
/// <summary>
/// Gets a copy of the observers.
/// </summary>
public IDictionary<TIdentity, TObserver> Observers
{
get
{
return _observers.ToDictionary(_ => _.Key, _ => _.Value.Observer);
}
}
/// <summary>
/// Removes all observers.
/// </summary>
public void Clear() => _observers.Clear();
/// <summary>
/// Ensures that the provided <paramref name="observer"/> is subscribed, renewing its subscription.
/// </summary>
/// <param name="id">
/// The observer's identity.
/// </param>
/// <param name="observer">
/// The observer.
/// </param>
/// <exception cref="Exception">A delegate callback throws an exception.</exception>
public void Subscribe(TIdentity id, TObserver observer)
{
// Add or update the subscription.
var now = GetDateTime();
ObserverEntry entry;
if (_observers.TryGetValue(id, out entry))
{
entry.LastSeen = now;
entry.Observer = observer;
if (_log.IsEnabled(LogLevel.Debug))
{
_log.LogDebug("Updating entry for {Id}/{Observer}. {Count} total observers.", id, observer, _observers.Count);
}
}
else
{
_observers[id] = new ObserverEntry { LastSeen = now, Observer = observer };
if (_log.IsEnabled(LogLevel.Debug))
{
_log.LogDebug("Adding entry for {Id}/{Observer}. {Count} total observers after add.", id, observer, _observers.Count);
}
}
}
/// <summary>
/// Ensures that the provided <paramref name="id"/> is unsubscribed.
/// </summary>
/// <param name="id">
/// The observer.
/// </param>
public void Unsubscribe(TIdentity id)
{
_log.LogDebug("Removed entry for {Id}. {Count} total observers after remove.", id, _observers.Count);
_observers.Remove(id, out _);
}
/// <summary>
/// Notifies all observers.
/// </summary>
/// <param name="notification">
/// The notification delegate to call on each observer.
/// </param>
/// <param name="predicate">
/// The predicate used to select observers to notify.
/// </param>
/// <returns>
/// A <see cref="Task"/> representing the work performed.
/// </returns>
public async Task Notify(Func<TObserver, Task> notification, Func<TObserver, bool> predicate = null)
{
var now = GetDateTime();
var defunct = default(List<TIdentity>);
foreach (var observer in _observers)
{
if (observer.Value.LastSeen + ExpirationDuration < now)
{
// Expired observers will be removed.
defunct ??= new List<TIdentity>();
defunct.Add(observer.Key);
continue;
}
// Skip observers which don't match the provided predicate.
if (predicate != null && !predicate(observer.Value.Observer))
{
continue;
}
try
{
await notification(observer.Value.Observer);
}
catch (Exception)
{
// Failing observers are considered defunct and will be removed..
defunct ??= new List<TIdentity>();
defunct.Add(observer.Key);
}
}
// Remove defunct observers.
if (defunct != default(List<TIdentity>))
{
foreach (var observer in defunct)
{
_observers.Remove(observer, out _);
if (_log.IsEnabled(LogLevel.Debug))
{
_log.LogDebug("Removing defunct entry for {0}. {1} total observers after remove.", observer, _observers.Count);
}
}
}
}
/// <summary>
/// Notifies all observers which match the provided <paramref name="predicate"/>.
/// </summary>
/// <param name="notification">
/// The notification delegate to call on each observer.
/// </param>
/// <param name="predicate">
/// The predicate used to select observers to notify.
/// </param>
public void Notify(Action<TObserver> notification, Func<TObserver, bool> predicate = null)
{
var now = GetDateTime();
var defunct = default(List<TIdentity>);
foreach (var observer in _observers)
{
if (observer.Value.LastSeen + ExpirationDuration < now)
{
// Expired observers will be removed.
defunct ??= new List<TIdentity>();
defunct.Add(observer.Key);
continue;
}
// Skip observers which don't match the provided predicate.
if (predicate != null && !predicate(observer.Value.Observer))
{
continue;
}
try
{
notification(observer.Value.Observer);
}
catch (Exception)
{
// Failing observers are considered defunct and will be removed..
defunct ??= new List<TIdentity>();
defunct.Add(observer.Key);
}
}
// Remove defunct observers.
if (defunct != default(List<TIdentity>))
{
foreach (var observer in defunct)
{
_observers.Remove(observer, out _);
if (_log.IsEnabled(LogLevel.Debug))
{
_log.LogDebug("Removing defunct entry for {Observer}. {Count} total observers after remove.", observer, _observers.Count);
}
}
}
}
/// <summary>
/// Removed all expired observers.
/// </summary>
public void ClearExpired()
{
var now = GetDateTime();
var defunct = default(List<TIdentity>);
foreach (var observer in _observers)
{
if (observer.Value.LastSeen + ExpirationDuration < now)
{
// Expired observers will be removed.
defunct ??= new List<TIdentity>();
defunct.Add(observer.Key);
}
}
// Remove defunct observers.
if (defunct is { Count: > 0 })
{
_log.LogInformation("Removing {Count} defunct observers entries.", defunct.Count);
foreach (var observer in defunct)
{
_observers.Remove(observer, out _);
}
}
}
/// <summary>
/// Returns an enumerator that iterates through the collection.
/// </summary>
/// <returns>
/// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
/// </returns>
public IEnumerator<TObserver> GetEnumerator() => _observers.Select(observer => observer.Value.Observer).GetEnumerator();
/// <summary>
/// Returns an enumerator that iterates through a collection.
/// </summary>
/// <returns>
/// An <see cref="T:System.Collections.IEnumerator"/> object that can be used to iterate through the collection.
/// </returns>
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
/// <summary>
/// An observer entry.
/// </summary>
private class ObserverEntry
{
/// <summary>
/// Gets or sets the observer.
/// </summary>
public TObserver Observer { get; set; }
/// <summary>
/// Gets or sets the UTC last seen time.
/// </summary>
public DateTime LastSeen { get; set; }
}
}
}