-
-
Notifications
You must be signed in to change notification settings - Fork 723
/
SubscribeResolverObjectFieldDescriptorExtensions.cs
144 lines (133 loc) · 5.3 KB
/
SubscribeResolverObjectFieldDescriptorExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using HotChocolate.Resolvers;
using HotChocolate.Subscriptions;
using HotChocolate.Utilities.StreamAdapters;
using HotChocolate.Utilities.Subscriptions;
namespace HotChocolate.Types;
public static class SubscribeResolverObjectFieldDescriptorExtensions
{
public static IObjectFieldDescriptor Subscribe<T>(
this IObjectFieldDescriptor descriptor,
Func<IResolverContext, Task<IObservable<T>>> subscribe)
{
return descriptor.Subscribe(async ctx =>
{
var observable = await subscribe(ctx).ConfigureAwait(false);
return new SourceStreamWrapper(new ObservableSourceStreamAdapter<T>(observable));
});
}
public static IObjectFieldDescriptor Subscribe<T>(
this IObjectFieldDescriptor descriptor,
Func<IResolverContext, IObservable<T>> subscribe) =>
descriptor.Subscribe(ctx => Task.FromResult(subscribe(ctx)));
public static IObjectFieldDescriptor Subscribe<T>(
this IObjectFieldDescriptor descriptor,
Func<IResolverContext, Task<IEnumerable<T>>> subscribe)
{
return descriptor.Subscribe(async ctx =>
{
var enumerable = await subscribe(ctx).ConfigureAwait(false);
return new SourceStreamWrapper(new EnumerableStreamAdapter<T>(enumerable));
});
}
public static IObjectFieldDescriptor Subscribe<T>(
this IObjectFieldDescriptor descriptor,
Func<IResolverContext, IEnumerable<T>> subscribe) =>
descriptor.Subscribe(ctx => Task.FromResult(subscribe(ctx)));
public static IObjectFieldDescriptor Subscribe<T>(
this IObjectFieldDescriptor descriptor,
Func<IResolverContext, Task<IAsyncEnumerable<T>>> subscribe)
{
return descriptor.Subscribe(async ctx =>
{
var enumerable = await subscribe(ctx).ConfigureAwait(false);
return new SourceStreamWrapper(new AsyncEnumerableStreamAdapter<T>(enumerable));
});
}
public static IObjectFieldDescriptor Subscribe<T>(
this IObjectFieldDescriptor descriptor,
Func<IResolverContext, IAsyncEnumerable<T>> subscribe) =>
descriptor.Subscribe(ctx => Task.FromResult(subscribe(ctx)));
/// <summary>
/// Subscribes to fixed topic on the <see cref="ITopicEventReceiver" />.
/// </summary>
/// <param name="descriptor">
/// The object field descriptor.
/// </param>
/// <param name="topicName">
/// A name representing the topic.
/// </param>
/// <typeparam name="TMessage">
/// The type of the message / event payload.
/// </typeparam>
public static IObjectFieldDescriptor SubscribeToTopic<TMessage>(
this IObjectFieldDescriptor descriptor,
string topicName) =>
SubscribeToTopic<string, TMessage>(
descriptor,
_ => topicName);
/// <summary>
/// Subscribes to a topic that is represented by an argument value.
/// </summary>
/// <param name="descriptor">
/// The object field descriptor.
/// </param>
/// <param name="argumentName">
/// A name of the argument that is used to resolve the topic.
/// </param>
/// <typeparam name="TMessage">
/// The type of the message / event payload.
/// </typeparam>
/// <typeparam name="TTopic">
/// The topic type.
/// </typeparam>
public static IObjectFieldDescriptor SubscribeToTopic<TTopic, TMessage>(
this IObjectFieldDescriptor descriptor,
string argumentName) =>
SubscribeToTopic<TTopic, TMessage>(
descriptor,
ctx => ctx.ArgumentValue<TTopic>(argumentName));
/// <summary>
/// Subscribes to a topic that is resolved by executing <paramref name="resolveTopic" />.
/// </summary>
/// <param name="descriptor">
/// The object field descriptor.
/// </param>
/// <param name="resolveTopic">
/// A delegate that resolves a value that will used as topic.
/// </param>
/// <typeparam name="TMessage">
/// The type of the message / event payload.
/// </typeparam>
public static IObjectFieldDescriptor SubscribeToTopic<TTopic, TMessage>(
this IObjectFieldDescriptor descriptor,
Func<IResolverContext, TTopic> resolveTopic) =>
SubscribeToTopic<TTopic, TMessage>(
descriptor,
ctx => new ValueTask<TTopic>(resolveTopic(ctx)));
/// <summary>
/// Subscribes to a topic that is resolved by executing <paramref name="resolveTopic" />.
/// </summary>
/// <param name="descriptor">
/// The object field descriptor.
/// </param>
/// <param name="resolveTopic">
/// A delegate that resolves a value that will used as topic.
/// </param>
/// <typeparam name="TMessage">
/// The type of the message / event payload.
/// </typeparam>
public static IObjectFieldDescriptor SubscribeToTopic<TTopic, TMessage>(
this IObjectFieldDescriptor descriptor,
Func<IResolverContext, ValueTask<TTopic>> resolveTopic)
{
return descriptor.Subscribe(async ctx =>
{
var receiver = ctx.Service<ITopicEventReceiver>();
var topic = await resolveTopic(ctx).ConfigureAwait(false);
return await receiver.SubscribeAsync<TTopic, TMessage>(topic).ConfigureAwait(false);
});
}
}