-
-
Notifications
You must be signed in to change notification settings - Fork 723
/
SubscribeAttribute.cs
170 lines (146 loc) · 5.18 KB
/
SubscribeAttribute.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
using System;
using System.Linq;
using System.Reflection;
using HotChocolate.Resolvers;
using HotChocolate.Subscriptions;
using HotChocolate.Types.Descriptors;
using HotChocolate.Types.Descriptors.Definitions;
using static System.Reflection.BindingFlags;
using static HotChocolate.Utilities.ThrowHelper;
#nullable enable
namespace HotChocolate.Types;
[AttributeUsage(AttributeTargets.Method)]
public sealed class SubscribeAttribute : ObjectFieldDescriptorAttribute
{
private static readonly MethodInfo _subscribeFactory =
typeof(SubscribeAttribute).GetMethod(nameof(SubscribeFactory), NonPublic | Static)!;
/// <summary>
/// The type of the message.
/// </summary>
public Type? MessageType { get; set; }
/// <summary>
/// The method that shall be used to subscribe to the pub/sub system.
/// </summary>
public string? With { get; set; }
public override void OnConfigure(
IDescriptorContext context,
IObjectFieldDescriptor descriptor,
MemberInfo member)
{
var method = (MethodInfo)member;
if (MessageType is null)
{
var messageParameter =
method.GetParameters()
.FirstOrDefault(t => t.IsDefined(typeof(EventMessageAttribute)));
if (messageParameter is null)
{
throw SubscribeAttribute_MessageTypeUnspecified(member);
}
MessageType = messageParameter.ParameterType;
}
if (string.IsNullOrEmpty(With))
{
var topicString = ResolveTopicString(method);
descriptor.Extend().OnBeforeNaming(
(_, fieldDef) =>
{
var factory = _subscribeFactory.MakeGenericMethod(MessageType);
factory.Invoke(null, new object[] { fieldDef, topicString });
});
}
else
{
descriptor.Extend().OnBeforeCreate(
d =>
{
var subscribeResolver = member.DeclaringType?.GetMethod(
With!,
Public | NonPublic | Instance | Static);
if (subscribeResolver is null)
{
throw SubscribeAttribute_SubscribeResolverNotFound(member, With);
}
d.SubscribeResolver = context.ResolverCompiler.CompileSubscribe(
subscribeResolver,
d.SourceType!,
d.ResolverType);
});
}
}
private static string ResolveTopicString(MethodInfo method)
{
if (method.IsDefined(typeof(TopicAttribute)))
{
return method.GetCustomAttribute<TopicAttribute>()?.Name ?? method.Name;
}
return method.Name;
}
private static void SubscribeFactory<TMessage>(
ObjectFieldDefinition fieldDef,
string topicString)
{
var arg = false;
if (topicString.Contains('{'))
{
for (var i = 0; i < fieldDef.Arguments.Count; i++)
{
var argument = fieldDef.Arguments[i];
var argumentPlaceholder = $"{{{argument.Name}}}";
if (topicString.Contains(argumentPlaceholder))
{
topicString = topicString.Replace(argumentPlaceholder, $"{{{i}}}");
arg = true;
}
}
}
if (arg)
{
fieldDef.SubscribeResolver = CreateArgumentSubscribeResolver<TMessage>(topicString);
}
else
{
fieldDef.SubscribeResolver = CreateSubscribeResolver<TMessage>(topicString);
}
}
private static SubscribeResolverDelegate CreateSubscribeResolver<TMessage>(
string topicString)
{
return async ctx =>
{
var ct = ctx.RequestAborted;
var receiver = ctx.Service<ITopicEventReceiver>();
return await receiver.SubscribeAsync<TMessage>(
topicString,
null,
null,
ct)
.ConfigureAwait(false);
};
}
private static SubscribeResolverDelegate CreateArgumentSubscribeResolver<TMessage>(
string topicFormatString)
{
return async ctx =>
{
var ct = ctx.RequestAborted;
var arguments = ctx.Selection.Field.Arguments;
var argumentValues = new object[arguments.Count];
// first we capture the argument values.
for (var i = 0; i < arguments.Count; i++)
{
argumentValues[i] = ctx.ArgumentValue<object>(arguments[i].Name);
}
// next we create from it the topic string.
var topicString = string.Format(topicFormatString, argumentValues);
// last we subscribe with the topic string.
var receiver = ctx.Service<ITopicEventReceiver>();
return await receiver.SubscribeAsync<TMessage>(
topicString,
null,
null,
ct)
.ConfigureAwait(false);
};
}
}