-
Notifications
You must be signed in to change notification settings - Fork 4.6k
/
EventHubOptions.cs
327 lines (291 loc) · 14.1 KB
/
EventHubOptions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Net;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Processor;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Hosting;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Microsoft.Azure.WebJobs.EventHubs
{
public class EventHubOptions : IOptionsFormatter
{
public EventHubOptions()
{
MaxEventBatchSize = 100;
MinEventBatchSize = 1;
MaxWaitTime = TimeSpan.FromSeconds(60);
ConnectionOptions = new EventHubConnectionOptions()
{
TransportType = EventHubsTransportType.AmqpTcp
};
EventProcessorOptions = new EventProcessorOptions()
{
TrackLastEnqueuedEventProperties = false,
MaximumWaitTime = TimeSpan.FromMinutes(1),
LoadBalancingStrategy = LoadBalancingStrategy.Greedy,
PrefetchCount = 300,
DefaultStartingPosition = EventPosition.Earliest,
ConnectionOptions = ConnectionOptions
};
InitialOffsetOptions = new InitialOffsetOptions();
}
internal EventProcessorOptions EventProcessorOptions { get; }
internal EventHubConnectionOptions ConnectionOptions { get; }
/// <summary>
/// The type of protocol and transport that will be used for communicating with the Event Hubs
/// service.
/// </summary>
///
public EventHubsTransportType TransportType
{
get => ConnectionOptions.TransportType;
set => ConnectionOptions.TransportType = value;
}
/// <summary>
/// The proxy to use for communication over web sockets.
/// </summary>
///
/// <remarks>
/// A proxy cannot be used for communication over TCP; if web sockets are not in
/// use, specifying a proxy is an invalid option.
/// </remarks>
public IWebProxy WebProxy
{
get => ConnectionOptions.Proxy;
set => ConnectionOptions.Proxy = value;
}
/// <summary>
/// The address to use for establishing a connection to the Event Hubs service, allowing network requests to be
/// routed through any application gateways or other paths needed for the host environment.
/// </summary>
///
/// <value>
/// This address will override the default endpoint of the Event Hubs namespace when making the network request
/// to the service. The default endpoint specified in a connection string or by a fully qualified namespace will
/// still be needed to negotiate the connection with the Event Hubs service.
/// </value>
///
public Uri CustomEndpointAddress
{
get => ConnectionOptions.CustomEndpointAddress;
set => ConnectionOptions.CustomEndpointAddress = value;
}
/// <summary>
/// The set of options to use for determining whether a failed operation should be retried and,
/// if so, the amount of time to wait between retry attempts. These options also control the
/// amount of time allowed for receiving event batches and other interactions with the Event Hubs service.
/// </summary>
///
public EventHubsRetryOptions ClientRetryOptions
{
get => EventProcessorOptions.RetryOptions;
set => EventProcessorOptions.RetryOptions = value;
}
private int _batchCheckpointFrequency = 1;
/// <summary>
/// Gets or sets the number of batches to process before creating an EventHub cursor checkpoint. Default 1.
/// </summary>
/// <remarks>If <see cref="EnableCheckpointing"/> is set to <c>false</c>, this value is ignored.</remarks>
public int BatchCheckpointFrequency
{
get => _batchCheckpointFrequency;
set
{
if (value <= 0)
{
throw new InvalidOperationException("Batch checkpoint frequency must be larger than 0.");
}
_batchCheckpointFrequency = value;
}
}
private int _maxEventBatchSize;
/// <summary>
/// Gets or sets the maximum number of events delivered in a batch. This setting applies only to functions that
/// receive multiple events. Default 100.
/// </summary>
public int MaxEventBatchSize
{
get => _maxEventBatchSize;
set
{
if (value < 1)
{
throw new ArgumentException("Batch size must be larger than 0.");
}
_maxEventBatchSize = value;
}
}
private int _minEventBatchSize;
/// <summary>
/// Gets or sets the minimum number of events desired for a batch. This setting applies only to functions that
/// receive multiple events. This value must be less than <see cref="MaxEventBatchSize"/> and is used in
/// conjunction with <see cref="MaxWaitTime"/>. Default 1.
/// </summary>
/// <remarks>
/// The minimum size is not a strict guarantee, as a partial batch will be dispatched if a full batch cannot be
/// prepared before the <see cref="MaxWaitTime"/> has elapsed. Partial batches are also likely for the first invocation
/// of the function after scaling takes place.
/// </remarks>
public int MinEventBatchSize
{
get => _minEventBatchSize;
set
{
if (value < 1)
{
throw new ArgumentException("Batch size must be larger than or equal to 1.");
}
_minEventBatchSize = value;
}
}
private TimeSpan _maxWaitTime;
/// <summary>
/// Gets or sets the maximum time that the trigger should wait to fill a batch before invoking the function.
/// This is only considered when <see cref="MinEventBatchSize"/> is set to larger than 1 and is otherwise unused.
/// If less than <see cref="MinEventBatchSize" /> events were available before the wait time elapses, the function
/// will be invoked with a partial batch. Default is 60 seconds. The longest allowed wait time is 10 minutes.
/// </summary>
/// <remarks>
/// This interval is not a strict guarantee for the exact timing on which the function will be invoked. There is a small
/// margin of error due to timer precision. When scaling takes place, the first invocation with a partial
/// batch may take place more quickly or may take up to twice the configured <see cref="MaxWaitTime"/>.
/// </remarks>
public TimeSpan MaxWaitTime
{
get => _maxWaitTime;
set
{
if (value < TimeSpan.Zero)
{
throw new ArgumentException("Max Wait Time must be larger than or equal to 0.");
}
if (value > TimeSpan.FromMinutes(10))
{
throw new ArgumentException("Max Wait Time must be less than or equal to 10 minutes.");
}
_maxWaitTime = value;
}
}
private int? _targetUnprocessedEventThreshold;
/// <summary>
/// Get or sets the target number of unprocessed events per worker for Event Hub-triggered functions. This is used in target-based scaling to override the default scaling threshold inferred from the <see cref="MaxEventBatchSize" /> option.
///
/// If TargetUnprocessedEventThreshold is set, the total unprocessed event count will be divided by this value to determine the number of worker instances, which will then be rounded up to a worker instance count that creates a balanced partition distribution.
/// </summary>
public int? TargetUnprocessedEventThreshold
{
get => _targetUnprocessedEventThreshold;
set
{
if (value < 1)
{
throw new ArgumentException("Unprocessed Event Threshold must be larger than 0.");
}
_targetUnprocessedEventThreshold = value;
}
}
/// <summary>
/// Gets the initial offset options to apply when processing. This only applies
/// when no checkpoint information is available.
/// </summary>
public InitialOffsetOptions InitialOffsetOptions { get; }
/// <summary>
/// Gets or sets a value indicating whether the trigger will create
/// checkpoints as events are being processed. The default value is <c>true</c>.
/// </summary>
/// <value><c>true</c> if checkpoints will be written; otherwise, <c>false</c>.</value>
/// <remarks>
/// This value takes precedence over <see cref="BatchCheckpointFrequency"/>, which will be
/// ignored if this property is set to <c>false</c>.
/// </remarks>
public bool EnableCheckpointing { get; set; } = true;
/// <inheritdoc cref="EventProcessorOptions.TrackLastEnqueuedEventProperties"/>
public bool TrackLastEnqueuedEventProperties
{
get => EventProcessorOptions.TrackLastEnqueuedEventProperties;
set => EventProcessorOptions.TrackLastEnqueuedEventProperties = value;
}
/// <inheritdoc cref="EventProcessorOptions.PrefetchCount"/>
public int PrefetchCount
{
get => EventProcessorOptions.PrefetchCount;
set => EventProcessorOptions.PrefetchCount = value;
}
/// <inheritdoc cref="EventProcessorOptions.PrefetchSizeInBytes"/>
public long? PrefetchSizeInBytes
{
get => EventProcessorOptions.PrefetchSizeInBytes;
set => EventProcessorOptions.PrefetchSizeInBytes = value;
}
/// <inheritdoc cref="EventProcessorOptions.PartitionOwnershipExpirationInterval"/>
public TimeSpan PartitionOwnershipExpirationInterval
{
get => EventProcessorOptions.PartitionOwnershipExpirationInterval;
set => EventProcessorOptions.PartitionOwnershipExpirationInterval = value;
}
/// <inheritdoc cref="EventProcessorOptions.LoadBalancingUpdateInterval"/>
public TimeSpan LoadBalancingUpdateInterval
{
get => EventProcessorOptions.LoadBalancingUpdateInterval;
set => EventProcessorOptions.LoadBalancingUpdateInterval = value;
}
/// <summary>
/// Gets or sets the Azure Blobs container name that the event processor uses to coordinate load balancing listening on an event hub.
/// </summary>
internal string CheckpointContainer { get; set; } = "azure-webjobs-eventhub";
internal Action<ExceptionReceivedEventArgs> ExceptionHandler { get; set; }
/// <summary>
/// Returns a string representation of this <see cref="EventHubOptions"/> instance.
/// </summary>
/// <returns>A string representation of this <see cref="EventHubOptions"/> instance.</returns>
string IOptionsFormatter.Format()
{
JObject options = new JObject
{
{ nameof(TargetUnprocessedEventThreshold), TargetUnprocessedEventThreshold },
{ nameof(MaxEventBatchSize), MaxEventBatchSize },
{ nameof(MinEventBatchSize), MinEventBatchSize },
{ nameof(MaxWaitTime), MaxWaitTime },
{ nameof(BatchCheckpointFrequency), BatchCheckpointFrequency },
{ nameof(TransportType), TransportType.ToString()},
{ nameof(WebProxy), WebProxy is WebProxy proxy ? proxy.Address.AbsoluteUri : string.Empty },
{ nameof(ClientRetryOptions), ConstructRetryOptions() },
{ nameof(TrackLastEnqueuedEventProperties), TrackLastEnqueuedEventProperties },
{ nameof(PrefetchCount), PrefetchCount },
{ nameof(PrefetchSizeInBytes), PrefetchSizeInBytes },
{ nameof(PartitionOwnershipExpirationInterval), PartitionOwnershipExpirationInterval },
{ nameof(LoadBalancingUpdateInterval), LoadBalancingUpdateInterval },
{ nameof(InitialOffsetOptions), ConstructInitialOffsetOptions() },
{ nameof(EnableCheckpointing), EnableCheckpointing },
};
// Only include if not null since it would otherwise not round-trip correctly due to
// https://github.com/dotnet/runtime/issues/36510. Once this issue is fixed, it can be included
// unconditionally.
if (CustomEndpointAddress != null)
{
options.Add(nameof(CustomEndpointAddress), CustomEndpointAddress?.AbsoluteUri);
}
return options.ToString(Formatting.Indented);
}
private JObject ConstructRetryOptions() =>
new JObject
{
{ nameof(EventHubsRetryOptions.Mode), ClientRetryOptions.Mode.ToString() },
{ nameof(EventHubsRetryOptions.TryTimeout), ClientRetryOptions.TryTimeout },
{ nameof(EventHubsRetryOptions.Delay), ClientRetryOptions.Delay },
{ nameof(EventHubsRetryOptions.MaximumDelay), ClientRetryOptions.MaximumDelay },
{ nameof(EventHubsRetryOptions.MaximumRetries), ClientRetryOptions.MaximumRetries },
};
private JObject ConstructInitialOffsetOptions() =>
new JObject
{
{ nameof(InitialOffsetOptions.Type), InitialOffsetOptions.Type.ToString() },
{ nameof(InitialOffsetOptions.EnqueuedTimeUtc), InitialOffsetOptions.EnqueuedTimeUtc },
};
}
}