/
Mailboxes.cs
378 lines (331 loc) · 16.8 KB
/
Mailboxes.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
//-----------------------------------------------------------------------
// <copyright file="Mailboxes.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Reflection;
using Akka.Actor;
using Akka.Annotations;
using Akka.Configuration;
using Akka.Dispatch.MessageQueues;
using Akka.Event;
using Akka.Util;
using Akka.Util.Internal;
namespace Akka.Dispatch
{
/// <summary>
/// Contains the directory of all <see cref="MailboxType"/>s registered and configured with a given <see cref="ActorSystem"/>.
/// </summary>
public class Mailboxes
{
/// <summary>
/// The system
/// </summary>
private readonly ActorSystem _system;
private readonly DeadLetterMailbox _deadLetterMailbox;
/// <summary>
/// TBD
/// </summary>
public static readonly string DefaultMailboxId = "akka.actor.default-mailbox";
/// <summary>
/// TBD
/// </summary>
public static readonly string NoMailboxRequirement = "";
private readonly Dictionary<Type, string> _mailboxBindings;
private readonly Config _defaultMailboxConfig;
private readonly ConcurrentDictionary<string, MailboxType> _mailboxTypeConfigurators = new();
private Settings Settings => _system.Settings;
/// <summary>
/// Initializes a new instance of the <see cref="Mailboxes" /> class.
/// </summary>
/// <param name="system">The system.</param>
public Mailboxes(ActorSystem system)
{
_system = system;
_deadLetterMailbox = new DeadLetterMailbox(system.DeadLetters);
var mailboxConfig = system.Settings.Config.GetConfig("akka.actor.mailbox");
if (mailboxConfig.IsNullOrEmpty())
throw ConfigurationException.NullOrEmptyConfig<Mailboxes>("akka.actor.mailbox");
var requirements = mailboxConfig.GetConfig("requirements").AsEnumerable().ToList();
_mailboxBindings = new Dictionary<Type, string>();
foreach (var kvp in requirements)
{
var type = Type.GetType(kvp.Key);
if (type == null)
{
Warn($"Mailbox Requirement mapping [{kvp.Key}] is not an actual type");
continue;
}
_mailboxBindings.Add(type, kvp.Value.GetString());
}
_defaultMailboxConfig = Settings.Config.GetConfig(DefaultMailboxId);
_defaultStashCapacity = StashCapacityFromConfig(Dispatchers.DefaultDispatcherId, DefaultMailboxId);
}
/// <summary>
/// TBD
/// </summary>
public DeadLetterMailbox DeadLetterMailbox { get { return _deadLetterMailbox; } }
/// <summary>
/// Check if this actor class can have a required message queue type.
/// </summary>
/// <param name="actorType">The type to check.</param>
/// <returns><c>true</c> if this actor has a message queue type requirement. <c>false</c> otherwise.</returns>
public bool HasRequiredType(Type actorType)
{
var interfaces = actorType.GetInterfaces();
for (int i = 0; i < interfaces.Length; i++)
{
var element = interfaces[i];
if (element.IsGenericType && element.GetGenericTypeDefinition() == RequiresMessageQueueGenericType)
{
return true;
}
}
return false;
}
/// <summary>
/// Check if this <see cref="MailboxType"/> implements the <see cref="IProducesMessageQueue{TQueue}"/> interface.
/// </summary>
/// <param name="mailboxType">The type of the <see cref="MailboxType"/> to check.</param>
/// <returns><c>true</c> if this mailboxtype produces queues. <c>false</c> otherwise.</returns>
public bool ProducesMessageQueue(Type mailboxType)
{
var interfaces = mailboxType.GetInterfaces();
for (int i = 0; i < interfaces.Length; i++)
{
var element = interfaces[i];
if (element.IsGenericType && element.GetGenericTypeDefinition() == ProducesMessageQueueGenericType)
{
return true;
}
}
return false;
}
private string LookupId(Type queueType)
{
if (!_mailboxBindings.TryGetValue(queueType, out string id))
throw new ConfigurationException($"Mailbox Mapping for [{queueType}] not configured");
return id;
}
/// <summary>
/// Returns a <see cref="MailboxType"/> as specified in configuration, based on the type, or if not defined null.
/// </summary>
/// <param name="queueType">The mailbox we need given the queue requirements.</param>
/// <exception cref="ConfigurationException">This exception is thrown if a mapping is not configured for the given <paramref name="queueType"/>.</exception>
/// <returns>A <see cref="MailboxType"/> as specified in configuration, based on the type, or if not defined null.</returns>
public MailboxType LookupByQueueType(Type queueType)
{
return Lookup(LookupId(queueType));
}
/// <summary>
/// Returns a <see cref="MailboxType"/> as specified in configuration, based on the id, or if not defined null.
/// </summary>
/// <param name="id">The ID of the mailbox to lookup</param>
/// <exception cref="ConfigurationException">
/// This exception is thrown if the mailbox type is not configured or the system could not load or find the type specified.
/// </exception>
/// <exception cref="ArgumentException">
/// This exception is thrown if the mailbox type could not be instantiated.
/// </exception>
/// <returns>The <see cref="MailboxType"/> specified in configuration or if not defined null.</returns>
public MailboxType Lookup(string id) => LookupConfigurator(id);
// don't care if these happen twice
private bool _mailboxSizeWarningIssued = false;
private bool _mailboxNonZeroPushTimeoutWarningIssued = false;
private MailboxType LookupConfigurator(string id)
{
if (!_mailboxTypeConfigurators.TryGetValue(id, out var configurator))
{
// It doesn't matter if we create a mailbox type configurator that isn't used due to concurrent lookup.
if (id.Equals("unbounded"))
configurator = new UnboundedMailbox();
else if (id.Equals("bounded"))
configurator = new BoundedMailbox(Settings, Config(id));
else
{
if (!Settings.Config.HasPath(id)) throw new ConfigurationException($"Mailbox Type [{id}] not configured");
var conf = Config(id);
var mailboxTypeName = conf.GetString("mailbox-type", null);
if (string.IsNullOrEmpty(mailboxTypeName))
throw new ConfigurationException($"The setting mailbox-type defined in [{id}] is empty");
var mailboxType = Type.GetType(mailboxTypeName)
?? throw new ConfigurationException($"Found mailbox-type [{mailboxTypeName}] in configuration for [{id}], but could not find that type in any loaded assemblies.");
var args = new object[] { Settings, conf };
try
{
configurator = (MailboxType)Activator.CreateInstance(mailboxType, args);
if (!_mailboxNonZeroPushTimeoutWarningIssued)
{
if (configurator is IProducesPushTimeoutSemanticsMailbox m && m.PushTimeout.Ticks > 0L)
{
Warn($"Configured potentially-blocking mailbox [{id}] configured with non-zero PushTimeOut ({m.PushTimeout}), " +
"which can lead to blocking behavior when sending messages to this mailbox. " +
$"Avoid this by setting `{id}.mailbox-push-timeout-time` to `0`.");
_mailboxNonZeroPushTimeoutWarningIssued = true;
}
// good; nothing to see here, move along, sir.
}
}
catch (Exception ex)
{
throw new ArgumentException($"Cannot instantiate MailboxType {mailboxType}, defined in [{id}]. Make sure it has a public " +
"constructor with [Akka.Actor.Settings, Akka.Configuration.Config] parameters", ex);
}
}
// add the new configurator to the mapping, or keep the existing if it was already added
_mailboxTypeConfigurators.AddOrUpdate(id, configurator, (_, type) => type);
}
return configurator;
}
/// <summary>
/// INTERNAL API
/// </summary>
/// <param name="id">The id of the mailbox whose config we're going to generate.</param>
/// <returns>A <see cref="Config"/> object for the mailbox with <paramref name="id"/></returns>
private Config Config(string id)
{
return ConfigurationFactory.ParseString($"id:{id}")
.WithFallback(Settings.Config.GetConfig(id))
.WithFallback(_defaultMailboxConfig);
}
private static readonly Type RequiresMessageQueueGenericType = typeof (IRequiresMessageQueue<>);
/// <summary>
/// TBD
/// </summary>
/// <param name="actorType">TBD</param>
/// <returns>TBD</returns>
public Type GetRequiredType(Type actorType)
{
var interfaces = actorType.GetInterfaces();
for (int i = 0; i < interfaces.Length; i++)
{
var element = interfaces[i];
if (element.IsGenericType && element.GetGenericTypeDefinition() == RequiresMessageQueueGenericType)
{
return element.GetGenericArguments()[0];
}
}
return null;
}
private static readonly Type ProducesMessageQueueGenericType = typeof (IProducesMessageQueue<>);
private Type GetProducedMessageQueueType(MailboxType mailboxType)
{
var interfaces = mailboxType.GetType().GetInterfaces();
for (int i = 0; i < interfaces.Length; i++)
{
var element = interfaces[i];
if (element.IsGenericType && element.GetGenericTypeDefinition() == ProducesMessageQueueGenericType)
{
return element.GetGenericArguments()[0];
}
}
throw new ArgumentException(nameof(mailboxType), $"No IProducesMessageQueue<TQueue> supplied for {mailboxType}; illegal mailbox type definition.");
}
private Type GetMailboxRequirement(Config config)
{
var mailboxRequirement = config.GetString("mailbox-requirement", null);
return mailboxRequirement == null || mailboxRequirement.Equals(NoMailboxRequirement) ? typeof (IMessageQueue) : Type.GetType(mailboxRequirement, true);
}
/// <summary>
/// TBD
/// </summary>
/// <param name="props">TBD</param>
/// <param name="dispatcherConfig">TBD</param>
/// <exception cref="ArgumentException">
/// This exception is thrown if the 'mailbox-requirement' in the given <paramref name="dispatcherConfig"/> isn't met.
/// </exception>
/// <returns>TBD</returns>
public MailboxType GetMailboxType(Props props, Config dispatcherConfig)
{
if (dispatcherConfig == null)
dispatcherConfig = ConfigurationFactory.Empty;
var id = dispatcherConfig.GetString("id", null);
var deploy = props.Deploy;
var actorType = props.Type;
var actorRequirement = new Lazy<Type>(() => GetRequiredType(actorType));
var mailboxRequirement = GetMailboxRequirement(dispatcherConfig);
var hasMailboxRequirement = mailboxRequirement != typeof(IMessageQueue);
var hasMailboxType = dispatcherConfig.HasPath("mailbox-type") &&
dispatcherConfig.GetString("mailbox-type", null) != Deploy.NoMailboxGiven;
if (!hasMailboxType && !_mailboxSizeWarningIssued && dispatcherConfig.HasPath("mailbox-size"))
{
Warn($"Ignoring setting 'mailbox-size for dispatcher [{id}], you need to specify 'mailbox-type=bounded`");
_mailboxSizeWarningIssued = true;
}
MailboxType VerifyRequirements(MailboxType mailboxType)
{
var mqType = new Lazy<Type>(() => GetProducedMessageQueueType(mailboxType));
if (hasMailboxRequirement && !mailboxRequirement.IsAssignableFrom(mqType.Value))
throw new ArgumentException($"produced message queue type [{mqType.Value}] does not fulfill requirement for dispatcher [{id}]." + $"Must be a subclass of [{mailboxRequirement}]");
if (HasRequiredType(actorType) && !actorRequirement.Value.IsAssignableFrom(mqType.Value))
throw new ArgumentException($"produced message queue type of [{mqType.Value}] does not fulfill requirement for actor class [{actorType}]." + $"Must be a subclass of [{actorRequirement.Value}]");
return mailboxType;
}
if (!deploy.Mailbox.Equals(Deploy.NoMailboxGiven))
return VerifyRequirements(Lookup(deploy.Mailbox));
if (!deploy.Dispatcher.Equals(Deploy.NoDispatcherGiven) && hasMailboxType)
return VerifyRequirements(Lookup(dispatcherConfig.GetString("id", null)));
if (actorRequirement.Value != null)
{
try
{
return VerifyRequirements(LookupByQueueType(actorRequirement.Value));
}
catch (Exception)
{
if (hasMailboxRequirement)
return VerifyRequirements(LookupByQueueType(mailboxRequirement));
throw;
}
}
if (hasMailboxRequirement)
return VerifyRequirements(LookupByQueueType(mailboxRequirement));
return VerifyRequirements(Lookup(DefaultMailboxId));
}
private void Warn(string msg) =>
_system.EventStream.Publish(new Warning("mailboxes", GetType(), msg));
private readonly AtomicReference<ImmutableDictionary<string, int>> _stashCapacityCache =
new(ImmutableDictionary<string, int>.Empty);
private readonly int _defaultStashCapacity;
/// <summary>
/// INTERNAL API
/// <para>
/// The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
/// </para>
/// </summary>
[InternalApi]
public int StashCapacity(string dispatcher, string mailbox)
{
bool UpdateCache(ImmutableDictionary<string, int> cache, string key, int value)
{
return _stashCapacityCache.CompareAndSet(cache, cache.SetItem(key, value)) ||
UpdateCache(_stashCapacityCache.Value, key, value); // recursive, try again
}
if (dispatcher == Dispatchers.DefaultDispatcherId && mailbox == DefaultMailboxId)
return _defaultStashCapacity;
var cache = _stashCapacityCache.Value;
var key = $"{dispatcher}-{mailbox}";
if (!cache.TryGetValue(key, out var value))
{
value = StashCapacityFromConfig(dispatcher, mailbox);
UpdateCache(cache, key, value);
}
return value;
}
private int StashCapacityFromConfig(string dispatcher, string mailbox)
{
var disp = Dispatchers.GetConfig(Settings.Config, dispatcher);
var fallback = disp.WithFallback(Settings.Config.GetConfig(DefaultMailboxId));
var config = mailbox == DefaultMailboxId
? fallback
: Settings.Config.GetConfig(mailbox).WithFallback(fallback);
return config.GetInt("stash-capacity");
}
}
}