/
CommandInterpreter.cs
132 lines (117 loc) · 6.55 KB
/
CommandInterpreter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
using System.Collections.Frozen;
using System.Diagnostics.CodeAnalysis;
using System.Reflection;
namespace DotNext.Net.Cluster.Consensus.Raft.Commands;
using IO.Log;
using Runtime.Serialization;
using static Reflection.MethodExtensions;
using static Runtime.Intrinsics;
/// <summary>
/// Represents interpreter of the log entries.
/// </summary>
/// <remarks>
/// The interpreter can be constructed in two ways: using <see cref="CommandInterpreter.Builder"/>
/// and through inheritance. If you choose the inheritance then command handlers must be declared
/// as public instance methods marked with <see cref="CommandInterpreter.CommandHandlerAttribute"/> attribute.
/// Otherwise, command handlers can be registered through the builder.
/// Typically, the interpreter is aggregated by the class derived from <see cref="PersistentState"/>.
/// All command types must be registered using <see cref="CommandAttribute{TCommand}"/> attributes
/// applied to the derived type.
/// </remarks>
public partial class CommandInterpreter : Disposable
{
/// <summary>
/// Indicates that the command cannot be decoded correctly.
/// </summary>
public sealed class UnknownCommandException : IntegrityException
{
internal UnknownCommandException(int id)
: base(ExceptionMessages.UnknownCommand(id))
{
CommandId = id;
}
/// <summary>
/// Gets ID of the unrecognized command.
/// </summary>
public int CommandId { get; }
}
private readonly IHandlerRegistry interpreters;
private readonly IReadOnlyDictionary<Type, int> identifiers;
private readonly int? snapshotCommandId;
/// <summary>
/// Initializes a new interpreter and discovers methods marked
/// with <see cref="CommandHandlerAttribute"/> attribute.
/// </summary>
[DynamicDependency(DynamicallyAccessedMemberTypes.PublicConstructors, typeof(CommandHandler<>))]
[DynamicDependency(DynamicallyAccessedMemberTypes.All, typeof(Func<,>))]
[RequiresUnreferencedCode("Dynamic code generation may be incompatible with IL trimming")]
protected CommandInterpreter()
{
// explore command types
var identifiers = GetType().GetCustomAttributes<CommandAttribute>(true).ToDictionary(static attr => attr.CommandType, static attr => attr.Id);
// register interpreters
var interpreters = new Dictionary<int, CommandHandler>();
const BindingFlags publicInstanceMethod = BindingFlags.Public | BindingFlags.Instance;
foreach (var method in GetType().GetMethods(publicInstanceMethod))
{
var handlerAttr = method.GetCustomAttribute<CommandHandlerAttribute>();
if (handlerAttr is not null && method.ReturnType == typeof(ValueTask))
{
var parameters = method.GetParameterTypes();
if (parameters.GetLength() is not 2 || !parameters[0].IsValueType || parameters[1] != typeof(CancellationToken))
continue;
var commandType = parameters[0];
if (!identifiers.TryGetValue(commandType, out var commandId))
continue;
var interpreter = Delegate.CreateDelegate(typeof(Func<,,>).MakeGenericType(commandType, typeof(CancellationToken), typeof(ValueTask)), method.IsStatic ? null : this, method);
interpreters.Add(commandId, Cast<CommandHandler>(Activator.CreateInstance(typeof(CommandHandler<>).MakeGenericType(commandType), interpreter)));
if (handlerAttr.IsSnapshotHandler)
snapshotCommandId = commandId;
}
}
this.interpreters = CreateRegistry(interpreters);
interpreters.Clear();
identifiers.TrimExcess();
this.identifiers = identifiers;
}
private CommandInterpreter(IDictionary<int, CommandHandler> interpreters, IDictionary<Type, int> identifiers, int? snapshotCommandId)
{
this.interpreters = CreateRegistry(interpreters);
this.identifiers = identifiers.ToFrozenDictionary();
this.snapshotCommandId = snapshotCommandId;
}
/// <summary>
/// Wraps the command to the log entry.
/// </summary>
/// <param name="command">The payload of the command.</param>
/// <param name="term">The term of the local node.</param>
/// <typeparam name="TCommand">The type of the command.</typeparam>
/// <returns>The instance of the log entry containing the command.</returns>
/// <exception cref="GenericArgumentException"><typeparamref name="TCommand"/> is not registered with <see cref="CommandAttribute{TCommand}"/>.</exception>
public LogEntry<TCommand> CreateLogEntry<TCommand>(TCommand command, long term)
where TCommand : notnull, ISerializable<TCommand>
=> identifiers.TryGetValue(typeof(TCommand), out var id) ?
new() { Term = term, Command = command, CommandId = id } :
throw new GenericArgumentException<TCommand>(ExceptionMessages.MissingCommandId, nameof(command));
private bool TryGetCommandId<TEntry>(ref TEntry entry, out int commandId)
where TEntry : struct, IRaftLogEntry
=> (entry.IsSnapshot ? snapshotCommandId : entry.CommandId).TryGetValue(out commandId);
/// <summary>
/// Interprets log entry asynchronously.
/// </summary>
/// <remarks>
/// Typically this method is called by the custom implementation of
/// <see cref="MemoryBasedStateMachine.ApplyAsync(PersistentState.LogEntry)"/> method.
/// </remarks>
/// <param name="entry">The log entry to be interpreted.</param>
/// <param name="token">The token that can be used to cancel the interpretation.</param>
/// <typeparam name="TEntry">The type of the log entry to be interpreted.</typeparam>
/// <returns>The ID of the interpreted log entry.</returns>
/// <exception cref="UnknownCommandException">The command handler was not registered for the command represented by <paramref name="entry"/>.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
public ValueTask<int> InterpretAsync<TEntry>(TEntry entry, CancellationToken token = default)
where TEntry : struct, IRaftLogEntry
=> TryGetCommandId(ref entry, out var id) ?
entry.TransformAsync<int, InterpretingTransformation>(new InterpretingTransformation(id, interpreters), token) :
ValueTask.FromException<int>(new ArgumentException(ExceptionMessages.MissingCommandId, nameof(entry)));
}