-
Notifications
You must be signed in to change notification settings - Fork 262
/
DurableTaskExtension.cs
1593 lines (1387 loc) · 75.4 KB
/
DurableTaskExtension.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage;
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Middleware;
using Microsoft.Azure.WebJobs.Description;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
using Microsoft.Azure.WebJobs.Host.Scale;
#endif
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Listener;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
/// <summary>
/// Configuration for the Durable Functions extension.
/// </summary>
#if !FUNCTIONS_V1
[Extension("DurableTask", "DurableTask")]
#endif
public class DurableTaskExtension :
IExtensionConfigProvider,
IDisposable,
IAsyncConverter<HttpRequestMessage, HttpResponseMessage>,
INameVersionObjectManager<TaskOrchestration>,
INameVersionObjectManager<TaskActivity>
{
private const string DefaultProvider = AzureStorageDurabilityProviderFactory.ProviderName;
internal static readonly string LoggerCategoryName = LogCategories.CreateTriggerCategory("DurableTask");
// Creating client objects is expensive, so we cache them when the attributes match.
// Note that DurableClientAttribute defines a custom equality comparer.
private readonly ConcurrentDictionary<DurableClientAttribute, DurableClient> cachedClients =
new ConcurrentDictionary<DurableClientAttribute, DurableClient>();
private readonly ConcurrentDictionary<FunctionName, RegisteredFunctionInfo> knownOrchestrators =
new ConcurrentDictionary<FunctionName, RegisteredFunctionInfo>();
private readonly ConcurrentDictionary<FunctionName, RegisteredFunctionInfo> knownEntities =
new ConcurrentDictionary<FunctionName, RegisteredFunctionInfo>();
private readonly ConcurrentDictionary<FunctionName, RegisteredFunctionInfo> knownActivities =
new ConcurrentDictionary<FunctionName, RegisteredFunctionInfo>();
private readonly AsyncLock taskHubLock = new AsyncLock();
#if FUNCTIONS_V2_OR_GREATER
#pragma warning disable CS0169
private readonly ITelemetryActivator telemetryActivator;
#pragma warning restore CS0169
#endif
#if FUNCTIONS_V3_OR_GREATER
private readonly LocalGrpcListener localGrpcListener;
#endif
private readonly bool isOptionsConfigured;
#pragma warning disable CS0612 // Type or member is obsolete
#pragma warning disable SA1401 // Fields should be private
internal IPlatformInformation PlatformInformationService;
#pragma warning restore SA1401 // Fields should be private
#pragma warning restore CS0612 // Type or member is obsolete
private IDurabilityProviderFactory durabilityProviderFactory;
private INameResolver nameResolver;
private ILoggerFactory loggerFactory;
private DurabilityProvider defaultDurabilityProvider;
private TaskHubWorker taskHubWorker;
private bool isTaskHubWorkerStarted;
private HttpClient durableHttpClient;
private EventSourceListener eventSourceListener;
#if FUNCTIONS_V1
private IConnectionInfoResolver connectionInfoResolver;
/// <summary>
/// Obsolete. Please use an alternate constructor overload.
/// </summary>
[Obsolete("The default constructor is obsolete and will be removed in future versions")]
public DurableTaskExtension()
{
// Options initialization happens later
this.Options = new DurableTaskOptions();
this.isOptionsConfigured = false;
}
#endif
/// <summary>
/// Initializes a new instance of the <see cref="DurableTaskExtension"/>.
/// </summary>
/// <param name="options">The configuration options for this extension.</param>
/// <param name="loggerFactory">The logger factory used for extension-specific logging and orchestration tracking.</param>
/// <param name="nameResolver">The name resolver to use for looking up application settings.</param>
/// <param name="orchestrationServiceFactories">The factories used to create orchestration service based on the configured storage provider.</param>
/// <param name="durableHttpMessageHandlerFactory">The HTTP message handler that handles HTTP requests and HTTP responses.</param>
/// <param name="hostLifetimeService">The host shutdown notification service for detecting and reacting to host shutdowns.</param>
/// <param name="lifeCycleNotificationHelper">The lifecycle notification helper used for custom orchestration tracking.</param>
/// <param name="messageSerializerSettingsFactory">The factory used to create <see cref="JsonSerializerSettings"/> for message settings.</param>
/// <param name="errorSerializerSettingsFactory">The factory used to create <see cref="JsonSerializerSettings"/> for error settings.</param>
/// <param name="webhookProvider">Provides webhook urls for HTTP management APIs.</param>
/// <param name="telemetryActivator">The activator of DistributedTracing. .netstandard2.0 only.</param>
/// <param name="platformInformationService">The platform information provider to inspect the OS, app service plan, and other enviroment information.</param>
public DurableTaskExtension(
IOptions<DurableTaskOptions> options,
ILoggerFactory loggerFactory,
INameResolver nameResolver,
IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories,
IApplicationLifetimeWrapper hostLifetimeService,
IDurableHttpMessageHandlerFactory durableHttpMessageHandlerFactory = null,
ILifeCycleNotificationHelper lifeCycleNotificationHelper = null,
IMessageSerializerSettingsFactory messageSerializerSettingsFactory = null,
#pragma warning disable CS0612 // Type or member is obsolete
IPlatformInformation platformInformationService = null,
#pragma warning restore CS0612 // Type or member is obsolete
#if FUNCTIONS_V2_OR_GREATER
IErrorSerializerSettingsFactory errorSerializerSettingsFactory = null,
#pragma warning disable CS0618 // Type or member is obsolete
IWebHookProvider webhookProvider = null,
#pragma warning restore CS0618 // Type or member is obsolete
ITelemetryActivator telemetryActivator = null)
#else
IErrorSerializerSettingsFactory errorSerializerSettingsFactory = null)
#endif
{
// Options will be null in Functions v1 runtime - populated later.
this.Options = options?.Value ?? new DurableTaskOptions();
this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver));
this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
this.PlatformInformationService = platformInformationService ?? throw new ArgumentNullException(nameof(platformInformationService));
this.ResolveAppSettingOptions();
ILogger logger = loggerFactory.CreateLogger(LoggerCategoryName);
this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
this.LifeCycleNotificationHelper = lifeCycleNotificationHelper ?? this.CreateLifeCycleNotificationHelper();
this.durabilityProviderFactory = this.GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
this.defaultDurabilityProvider = this.durabilityProviderFactory.GetDurabilityProvider();
this.isOptionsConfigured = true;
if (durableHttpMessageHandlerFactory == null)
{
durableHttpMessageHandlerFactory = new DurableHttpMessageHandlerFactory();
}
DurableHttpClientFactory durableHttpClientFactory = new DurableHttpClientFactory();
this.durableHttpClient = durableHttpClientFactory.GetClient(durableHttpMessageHandlerFactory);
this.MessageDataConverter = CreateMessageDataConverter(messageSerializerSettingsFactory);
this.ErrorDataConverter = this.CreateErrorDataConverter(errorSerializerSettingsFactory);
this.TypedCodeProvider = new TypedCodeProvider();
this.TypedCodeProvider.Initialize();
this.HttpApiHandler = new HttpApiHandler(this, logger);
#if !FUNCTIONS_V1
// This line ensure every time we need the webhook URI, we get it directly from the
// function runtime, which has the most up-to-date knowledge about the site hostname.
Func<Uri> webhookDelegate = () => webhookProvider.GetUrl(this);
this.HttpApiHandler.RegisterWebhookProvider(
this.Options.WebhookUriProviderOverride ??
webhookDelegate);
#endif
this.HostLifetimeService = hostLifetimeService;
#if !FUNCTIONS_V1
// The RPC server is started when the extension is initialized.
// The RPC server is stopped when the host has finished shutting down.
this.HostLifetimeService.OnStopped.Register(this.StopLocalHttpServer);
this.telemetryActivator = telemetryActivator;
this.telemetryActivator?.Initialize(logger);
#endif
// Starting with .NET isolated and Java, we have a more efficient out-of-process
// function invocation protocol. Other languages will use the existing protocol.
WorkerRuntimeType runtimeType = this.PlatformInformationService.GetWorkerRuntimeType();
if (runtimeType == WorkerRuntimeType.DotNetIsolated || runtimeType == WorkerRuntimeType.Java)
{
this.OutOfProcProtocol = OutOfProcOrchestrationProtocol.MiddlewarePassthrough;
#if FUNCTIONS_V3_OR_GREATER
this.localGrpcListener = new LocalGrpcListener(this, this.defaultDurabilityProvider);
this.HostLifetimeService.OnStopped.Register(this.StopLocalGrpcServer);
#endif
}
else
{
this.OutOfProcProtocol = OutOfProcOrchestrationProtocol.OrchestratorShim;
}
}
#if FUNCTIONS_V1
internal DurableTaskExtension(
IOptions<DurableTaskOptions> options,
ILoggerFactory loggerFactory,
INameResolver nameResolver,
IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories,
IConnectionInfoResolver connectionInfoResolver,
IApplicationLifetimeWrapper shutdownNotification,
IDurableHttpMessageHandlerFactory durableHttpMessageHandlerFactory,
#pragma warning disable CS0612 // Type or member is obsolete
IPlatformInformation platformInformationService)
#pragma warning restore CS0612 // Type or member is obsolete
: this(options, loggerFactory, nameResolver, orchestrationServiceFactories, shutdownNotification, durableHttpMessageHandlerFactory)
{
this.connectionInfoResolver = connectionInfoResolver;
}
/// <summary>
/// Gets or sets default task hub name to be used by all <see cref="IDurableClient"/>, <see cref="IDurableOrchestrationClient"/>, <see cref="IDurableEntityClient"/>,
/// <see cref="IDurableOrchestrationContext"/>, and <see cref="IDurableActivityContext"/> instances.
/// </summary>
/// <remarks>
/// A task hub is a logical grouping of storage resources. Alternate task hub names can be used to isolate
/// multiple Durable Functions applications from each other, even if they are using the same storage backend.
/// </remarks>
/// <value>The name of the default task hub.</value>
public string HubName
{
get { return this.Options.HubName; }
set { this.Options.HubName = value; }
}
#endif
internal DurableTaskOptions Options { get; }
internal HttpApiHandler HttpApiHandler { get; private set; }
internal ILifeCycleNotificationHelper LifeCycleNotificationHelper { get; private set; }
internal EndToEndTraceHelper TraceHelper { get; private set; }
internal MessagePayloadDataConverter MessageDataConverter { get; private set; }
internal MessagePayloadDataConverter ErrorDataConverter { get; private set; }
internal TypedCodeProvider TypedCodeProvider { get; private set; }
internal TimeSpan MessageReorderWindow
=> this.defaultDurabilityProvider.GuaranteesOrderedDelivery ? TimeSpan.Zero : TimeSpan.FromMinutes(this.Options.EntityMessageReorderWindowInMinutes);
internal bool UseImplicitEntityDeletion => this.defaultDurabilityProvider.SupportsImplicitEntityDeletion;
internal IApplicationLifetimeWrapper HostLifetimeService { get; } = HostLifecycleService.NoOp;
internal OutOfProcOrchestrationProtocol OutOfProcProtocol { get; }
internal static MessagePayloadDataConverter CreateMessageDataConverter(IMessageSerializerSettingsFactory messageSerializerSettingsFactory)
{
bool isDefault;
if (messageSerializerSettingsFactory == null)
{
messageSerializerSettingsFactory = new MessageSerializerSettingsFactory();
}
isDefault = messageSerializerSettingsFactory is MessageSerializerSettingsFactory;
return new MessagePayloadDataConverter(messageSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}
private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSettingsFactory errorSerializerSettingsFactory)
{
bool isDefault;
if (errorSerializerSettingsFactory == null)
{
errorSerializerSettingsFactory = new ErrorSerializerSettingsFactory();
}
isDefault = errorSerializerSettingsFactory is ErrorSerializerSettingsFactory;
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}
private IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories)
{
bool storageTypeIsConfigured = options.StorageProvider.TryGetValue("type", out object storageType);
if (!storageTypeIsConfigured)
{
try
{
IDurabilityProviderFactory defaultFactory = orchestrationServiceFactories.First(f => f.Name.Equals(DefaultProvider));
logger.LogInformation($"Using the default storage provider: {DefaultProvider}.");
return defaultFactory;
}
catch (InvalidOperationException e)
{
throw new InvalidOperationException($"Couldn't find the default storage provider: {DefaultProvider}.", e);
}
}
try
{
IDurabilityProviderFactory selectedFactory = orchestrationServiceFactories.First(f => string.Equals(f.Name, storageType.ToString(), StringComparison.OrdinalIgnoreCase));
logger.LogInformation($"Using the {storageType} storage provider.");
return selectedFactory;
}
catch (InvalidOperationException e)
{
IList<string> factoryNames = orchestrationServiceFactories.Select(f => f.Name).ToList();
throw new InvalidOperationException($"Storage provider type ({storageType}) was not found. Available storage providers: {string.Join(", ", factoryNames)}.", e);
}
}
internal string GetBackendInfo()
{
return this.defaultDurabilityProvider.GetBackendInfo();
}
/// <summary>
/// Internal initialization call from the WebJobs host.
/// </summary>
/// <param name="context">Extension context provided by WebJobs.</param>
void IExtensionConfigProvider.Initialize(ExtensionConfigContext context)
{
#if !FUNCTIONS_V1
// Functions V1 is not supported in linux, so this is conditionally compiled
// We initialize linux logging early on in case any initialization steps below were to trigger a log event.
if (this.PlatformInformationService.GetOperatingSystem() == OperatingSystem.Linux)
{
this.InitializeLinuxLogging();
}
#endif
ConfigureLoaderHooks();
// Functions V1 has it's configuration initialized at startup time (now).
// For Functions V2 (and for some unit tests) configuration happens earlier in the pipeline.
if (!this.isOptionsConfigured)
{
this.InitializeForFunctionsV1(context);
}
// Throw if any of the configured options are invalid
this.Options.Validate(this.nameResolver, this.TraceHelper);
#pragma warning disable CS0618 // Type or member is obsolete
// Invoke webhook handler to make functions runtime register extension endpoints.
var initialWebhookUri = context.GetWebhookHandler();
#if FUNCTIONS_V1
// In Functions V1, there is no notion of an IWebhookProvider that
// we can dynamically call to fetch the webhook URI, and since context.GetWebhookHandler()
// only works in the scope of the Initialize() function, we just have to live with the static URI
// we grab now.
Func<Uri> staticWebhookHandler = () => initialWebhookUri;
this.HttpApiHandler.RegisterWebhookProvider(
this.Options.WebhookUriProviderOverride ??
staticWebhookHandler);
#endif
#pragma warning restore CS0618 // Type or member is obsolete
this.TraceConfigurationSettings();
var bindings = new BindingHelper(this);
// Note that the order of the rules is important
var rule = context.AddBindingRule<DurableClientAttribute>()
.AddConverter<string, StartOrchestrationArgs>(bindings.StringToStartOrchestrationArgs)
.AddConverter<JObject, StartOrchestrationArgs>(bindings.JObjectToStartOrchestrationArgs)
.AddConverter<IDurableClient, string>(bindings.DurableOrchestrationClientToString);
rule.BindToCollector<StartOrchestrationArgs>(bindings.CreateAsyncCollector);
rule.BindToInput<IDurableOrchestrationClient>(this.GetClient);
rule.BindToInput<IDurableEntityClient>(this.GetClient);
rule.BindToInput<IDurableClient>(this.GetClient);
if (this.TypedCodeProvider.IsInitialized)
{
rule.Bind(new TypedDurableClientBindingProvider(this.TypedCodeProvider, this.GetClient));
}
// We add a binding rule to support the now deprecated `orchestrationClient` binding
// A cleaner solution would be to have the prior rule have an OR-style selector between
// DurableClientAttribute and OrchestrationClientAttribute, but it's unclear if that's
// possible (for now).
#pragma warning disable CS0618 // Type or member is obsolete
var backwardsCompRule = context.AddBindingRule<OrchestrationClientAttribute>()
#pragma warning restore CS0618 // Type or member is obsolete
.AddConverter<string, StartOrchestrationArgs>(bindings.StringToStartOrchestrationArgs)
.AddConverter<JObject, StartOrchestrationArgs>(bindings.JObjectToStartOrchestrationArgs)
.AddConverter<IDurableClient, string>(bindings.DurableOrchestrationClientToString);
backwardsCompRule.BindToCollector<StartOrchestrationArgs>(bindings.CreateAsyncCollector);
backwardsCompRule.BindToInput<IDurableOrchestrationClient>(this.GetClient);
backwardsCompRule.BindToInput<IDurableEntityClient>(this.GetClient);
backwardsCompRule.BindToInput<IDurableClient>(this.GetClient);
if (this.TypedCodeProvider.IsInitialized)
{
backwardsCompRule.Bind(new TypedDurableClientBindingProvider(this.TypedCodeProvider, this.GetClient));
}
string connectionName = this.durabilityProviderFactory is AzureStorageDurabilityProviderFactory azureStorageDurabilityProviderFactory
? azureStorageDurabilityProviderFactory.DefaultConnectionName
: null;
context.AddBindingRule<OrchestrationTriggerAttribute>()
.BindToTrigger(new OrchestrationTriggerAttributeBindingProvider(this, connectionName));
context.AddBindingRule<ActivityTriggerAttribute>()
.BindToTrigger(new ActivityTriggerAttributeBindingProvider(this, connectionName));
context.AddBindingRule<EntityTriggerAttribute>()
.BindToTrigger(new EntityTriggerAttributeBindingProvider(this, connectionName));
this.taskHubWorker = new TaskHubWorker(this.defaultDurabilityProvider, this, this, this.loggerFactory);
// Add middleware to the DTFx dispatcher so that we can inject our own logic
// into and customize the orchestration execution pipeline.
// Note that the order of the middleware added determines the order in which it executes.
if (this.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough)
{
#if FUNCTIONS_V3_OR_GREATER
// This is a newer, more performant flavor of orchestration/activity middleware that is being
// enabled for newer language runtimes. Support for entities in this model is TBD.
var ooprocMiddleware = new OutOfProcMiddleware(this);
this.taskHubWorker.AddActivityDispatcherMiddleware(ooprocMiddleware.CallActivityAsync);
this.taskHubWorker.AddOrchestrationDispatcherMiddleware(ooprocMiddleware.CallOrchestratorAsync);
#else
// This can happen if, for example, a Java user tries to use Durable Functions while targeting V2 or V3 extension bundles
// because those bundles target .NET Core 2.2, which doesn't support the gRPC libraries used in the modern out-of-proc implementation.
throw new PlatformNotSupportedException(
"This project type is not supported on this version of the Azure Functions runtime. Please upgrade to Azure Functions V3 or higher. " +
"If you are using a language that supports extension bundles, please use extension bundles V4 or higher. " +
"For more information on Azure Functions versions, see https://docs.microsoft.com/azure/azure-functions/functions-versions. " +
"For more information on extension bundles, see https://docs.microsoft.com/azure/azure-functions/functions-bindings-register#extension-bundles.");
#endif
}
else
{
// This is the older middleware implementation that is currently in use for in-process .NET
// and the older out-of-proc languages, like Node.js, Python, and PowerShell.
this.taskHubWorker.AddActivityDispatcherMiddleware(this.ActivityMiddleware);
this.taskHubWorker.AddOrchestrationDispatcherMiddleware(this.EntityMiddleware);
this.taskHubWorker.AddOrchestrationDispatcherMiddleware(this.OrchestrationMiddleware);
}
// The RPC server needs to be started sometime before any functions can be triggered
// and this is the latest point in the pipeline available to us.
#if FUNCTIONS_V3_OR_GREATER
if (this.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough)
{
this.StartLocalGrpcServer();
}
#endif
#if FUNCTIONS_V2_OR_GREATER
if (this.OutOfProcProtocol == OutOfProcOrchestrationProtocol.OrchestratorShim)
{
this.StartLocalHttpServer();
}
#endif
}
internal string GetLocalRpcAddress()
{
#if FUNCTIONS_V3_OR_GREATER
if (this.OutOfProcProtocol == OutOfProcOrchestrationProtocol.MiddlewarePassthrough)
{
return this.localGrpcListener.ListenAddress;
}
#endif
return this.HttpApiHandler.GetBaseUrl();
}
/// <summary>
/// Initializes the logging service for App Service if it detects that we are running in
/// the linux platform.
/// </summary>
private void InitializeLinuxLogging()
{
// Determine host platform
bool inConsumption = this.PlatformInformationService.IsInConsumptionPlan();
string tenant = this.PlatformInformationService.GetLinuxTenant();
string stampName = this.PlatformInformationService.GetLinuxStampName();
string containerName = this.PlatformInformationService.GetContainerName();
// in linux consumption, logs are emitted to the console.
// In other linux plans, they are emitted to a logfile.
var linuxLogger = new LinuxAppServiceLogger(writeToConsole: inConsumption, containerName, tenant, stampName);
// The logging service for linux works by capturing EventSource messages,
// which our linux platform does not recognize, and logging them via a
// different strategy such as writing to console or to a file.
// Since our logging payload can be quite large, linux telemetry by default
// disables verbose-level telemetry to avoid a performance hit.
bool enableVerbose = this.Options.Tracing.AllowVerboseLinuxTelemetry;
this.eventSourceListener = new EventSourceListener(linuxLogger, enableVerbose, this.TraceHelper, this.defaultDurabilityProvider.EventSourceName);
}
/// <inheritdoc />
public void Dispose()
{
this.HttpApiHandler?.Dispose();
this.eventSourceListener?.Dispose();
}
#if FUNCTIONS_V2_OR_GREATER
private void StartLocalHttpServer()
{
bool? shouldEnable = this.Options.LocalRpcEndpointEnabled;
if (!shouldEnable.HasValue)
{
WorkerRuntimeType runtimeType = this.PlatformInformationService.GetWorkerRuntimeType();
shouldEnable = runtimeType switch
{
// dotnet runs in process
WorkerRuntimeType.DotNet => false,
// dotnet-isolated and java use a gRPC server instead of the HTTP server
WorkerRuntimeType.DotNetIsolated => false,
WorkerRuntimeType.Java => false,
// everything else - assume the HTTP server
WorkerRuntimeType.Python => true,
WorkerRuntimeType.Node => true,
WorkerRuntimeType.PowerShell => true,
WorkerRuntimeType.Unknown => true,
_ => true,
};
}
if (shouldEnable == true)
{
this.HttpApiHandler.StartLocalHttpServerAsync().GetAwaiter().GetResult();
}
}
private void StopLocalHttpServer()
{
this.HttpApiHandler.StopLocalHttpServerAsync().GetAwaiter().GetResult();
}
#endif
#if FUNCTIONS_V3_OR_GREATER
private void StartLocalGrpcServer()
{
this.localGrpcListener.StartAsync().GetAwaiter().GetResult();
}
private void StopLocalGrpcServer()
{
this.localGrpcListener.StopAsync().GetAwaiter().GetResult();
}
#endif
private void ResolveAppSettingOptions()
{
if (this.Options == null)
{
throw new InvalidOperationException($"{nameof(this.Options)} must be set before resolving app settings.");
}
if (this.nameResolver == null)
{
throw new InvalidOperationException($"{nameof(this.nameResolver)} must be set before resolving app settings.");
}
if (this.nameResolver.TryResolveWholeString(this.Options.HubName, out string taskHubName))
{
// use the resolved task hub name
this.Options.HubName = taskHubName;
}
}
private void InitializeForFunctionsV1(ExtensionConfigContext context)
{
#if FUNCTIONS_V1
context.ApplyConfig(this.Options, "DurableTask");
this.nameResolver = context.Config.NameResolver;
this.loggerFactory = context.Config.LoggerFactory;
this.ResolveAppSettingOptions();
ILogger logger = this.loggerFactory.CreateLogger(LoggerCategoryName);
this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
this.connectionInfoResolver = new WebJobsConnectionInfoProvider();
this.PlatformInformationService = new DefaultPlatformInformation(this.nameResolver, this.loggerFactory);
this.durabilityProviderFactory = new AzureStorageDurabilityProviderFactory(
new OptionsWrapper<DurableTaskOptions>(this.Options),
new AzureStorageAccountProvider(this.connectionInfoResolver),
this.nameResolver,
this.loggerFactory,
this.PlatformInformationService);
this.defaultDurabilityProvider = this.durabilityProviderFactory.GetDurabilityProvider();
this.LifeCycleNotificationHelper = this.CreateLifeCycleNotificationHelper();
var messageSerializerSettingsFactory = new MessageSerializerSettingsFactory();
var errorSerializerSettingsFactory = new ErrorSerializerSettingsFactory();
this.MessageDataConverter = new MessagePayloadDataConverter(messageSerializerSettingsFactory.CreateJsonSerializerSettings(), true);
this.ErrorDataConverter = new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), true);
this.HttpApiHandler = new HttpApiHandler(this, logger);
this.TypedCodeProvider = new TypedCodeProvider();
this.TypedCodeProvider.Initialize();
#endif
}
private void TraceConfigurationSettings()
{
this.Options.TraceConfiguration(
this.TraceHelper,
this.defaultDurabilityProvider.ConfigurationJson);
}
private ILifeCycleNotificationHelper CreateLifeCycleNotificationHelper()
{
// First: EventGrid
if (this.Options.Notifications.EventGrid != null
&& (!string.IsNullOrEmpty(this.Options.Notifications.EventGrid.TopicEndpoint) || !string.IsNullOrEmpty(this.Options.Notifications.EventGrid.KeySettingName)))
{
return new EventGridLifeCycleNotificationHelper(this.Options, this.nameResolver, this.TraceHelper);
}
// Fallback: Disable Notification
return new NullLifeCycleNotificationHelper();
}
/// <summary>
/// Deletes all data stored in the current task hub.
/// </summary>
/// <returns>A task representing the async delete operation.</returns>
public Task DeleteTaskHubAsync()
{
return this.defaultDurabilityProvider.DeleteAsync();
}
/// <summary>
/// Called by the Durable Task Framework: Not used.
/// </summary>
/// <param name="creator">This parameter is not used.</param>
void INameVersionObjectManager<TaskOrchestration>.Add(ObjectCreator<TaskOrchestration> creator)
{
throw new InvalidOperationException("Orchestrations should never be added explicitly.");
}
/// <summary>
/// Called by the Durable Task Framework: Returns the specified <see cref="TaskOrchestration"/>.
/// </summary>
/// <param name="name">The name of the orchestration to return.</param>
/// <param name="version">Not used.</param>
/// <returns>An orchestration shim that delegates execution to an orchestrator function.</returns>
TaskOrchestration INameVersionObjectManager<TaskOrchestration>.GetObject(string name, string version)
{
if (name.StartsWith("@"))
{
return new TaskEntityShim(this, this.defaultDurabilityProvider, name);
}
else
{
return new TaskOrchestrationShim(this, this.defaultDurabilityProvider, name);
}
}
/// <summary>
/// Called by the durable task framework: Not used.
/// </summary>
/// <param name="creator">This parameter is not used.</param>
void INameVersionObjectManager<TaskActivity>.Add(ObjectCreator<TaskActivity> creator)
{
throw new InvalidOperationException("Activities should never be added explicitly.");
}
/// <summary>
/// Called by the Durable Task Framework: Returns the specified <see cref="TaskActivity"/>.
/// </summary>
/// <param name="name">The name of the activity to return.</param>
/// <param name="version">Not used.</param>
/// <returns>An activity shim that delegates execution to an activity function.</returns>
TaskActivity INameVersionObjectManager<TaskActivity>.GetObject(string name, string version)
{
if (IsDurableHttpTask(name))
{
return new TaskHttpActivityShim(this, this.durableHttpClient);
}
FunctionName activityFunction = new FunctionName(name);
RegisteredFunctionInfo info;
if (!this.knownActivities.TryGetValue(activityFunction, out info))
{
return new TaskNonexistentActivityShim(this, name);
}
return new TaskActivityShim(this, info.Executor, this.HostLifetimeService, name);
}
/// <summary>
/// This DTFx activity middleware allows us to add context to the activity function shim
/// before it actually starts running.
/// </summary>
/// <param name="dispatchContext">A property bag containing useful DTFx context.</param>
/// <param name="next">The handler for running the next middleware in the pipeline.</param>
private Task ActivityMiddleware(DispatchMiddlewareContext dispatchContext, Func<Task> next)
{
if (dispatchContext.GetProperty<TaskActivity>() is TaskActivityShim shim)
{
TaskScheduledEvent @event = dispatchContext.GetProperty<TaskScheduledEvent>();
shim.SetTaskEventId(@event?.EventId ?? -1);
}
// Move to the next stage of the DTFx pipeline to trigger the activity shim.
return next();
}
/// <summary>
/// This DTFx orchestration middleware allows us to initialize Durable Functions-specific context
/// and make the execution happen in a way that plays nice with the Azure Functions execution pipeline.
/// </summary>
/// <param name="dispatchContext">A property bag containing useful DTFx context.</param>
/// <param name="next">The handler for running the next middleware in the pipeline.</param>
private async Task OrchestrationMiddleware(DispatchMiddlewareContext dispatchContext, Func<Task> next)
{
TaskOrchestrationShim shim = dispatchContext.GetProperty<TaskOrchestration>() as TaskOrchestrationShim;
if (shim == null)
{
// This is not an orchestration - skip.
await next();
return;
}
DurableOrchestrationContext context = (DurableOrchestrationContext)shim.Context;
OrchestrationRuntimeState orchestrationRuntimeState = dispatchContext.GetProperty<OrchestrationRuntimeState>();
if (orchestrationRuntimeState.ParentInstance != null)
{
context.ParentInstanceId = orchestrationRuntimeState.ParentInstance.OrchestrationInstance.InstanceId;
}
context.InstanceId = orchestrationRuntimeState.OrchestrationInstance?.InstanceId;
context.ExecutionId = orchestrationRuntimeState.OrchestrationInstance?.ExecutionId;
context.IsReplaying = orchestrationRuntimeState.ExecutionStartedEvent.IsPlayed;
context.History = orchestrationRuntimeState.Events;
context.RawInput = orchestrationRuntimeState.Input;
RegisteredFunctionInfo info = shim.GetFunctionInfo();
if (info == null)
{
string message = this.GetInvalidOrchestratorFunctionMessage(context.FunctionName);
this.TraceHelper.ExtensionWarningEvent(
this.Options.HubName,
orchestrationRuntimeState.Name,
orchestrationRuntimeState.OrchestrationInstance.InstanceId,
message);
Func<Task<OrchestrationFailureException>> nonExistentException = () => throw new OrchestrationFailureException(message);
shim.SetFunctionInvocationCallback(nonExistentException);
await next();
}
else
{
// 1. Start the functions invocation pipeline (billing, logging, bindings, and timeout tracking).
WrappedFunctionResult result = await FunctionExecutionHelper.ExecuteFunctionInOrchestrationMiddleware(
info.Executor,
new TriggeredFunctionData
{
TriggerValue = context,
#pragma warning disable CS0618 // Approved for use by this extension
InvokeHandler = async userCodeInvoker =>
{
// We yield control to ensure this code is executed asynchronously relative to WebJobs.
// This ensures WebJobs is able to correctly cancel the invocation in the case of a timeout.
await Task.Yield();
context.ExecutorCalledBack = true;
// 2. Configure the shim with the inner invoker to execute the user code.
shim.SetFunctionInvocationCallback(userCodeInvoker);
// 3. Move to the next stage of the DTFx pipeline to trigger the orchestrator shim.
await next();
// 4. If an activity failed, indicate to the functions Host that this execution failed via an exception
if (context.IsCompleted && context.OrchestrationException != null)
{
context.OrchestrationException.Throw();
}
},
#pragma warning restore CS0618
},
shim,
context,
this.HostLifetimeService.OnStopping);
if (result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionsRuntimeError
|| result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionsHostStoppingError
|| result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionTimeoutError)
{
this.TraceHelper.FunctionAborted(
this.Options.HubName,
context.FunctionName,
context.InstanceId,
$"An internal error occurred while attempting to execute this function. The execution will be aborted and retried. Details: {result.Exception}",
functionType: FunctionType.Orchestrator);
// This will abort the execution and cause the message to go back onto the queue for re-processing
throw new SessionAbortedException(
$"An internal error occurred while attempting to execute '{context.FunctionName}'.", result.Exception);
}
}
if (!context.IsCompleted && !context.IsLongRunningTimer)
{
this.TraceHelper.FunctionAwaited(
context.HubName,
context.Name,
FunctionType.Orchestrator,
context.InstanceId,
context.IsReplaying);
}
if (context.IsCompleted &&
context.PreserveUnprocessedEvents)
{
// Reschedule any unprocessed external events so that they can be picked up
// in the next iteration.
context.RescheduleBufferedExternalEvents();
}
await context.RunDeferredTasks();
}
/// <summary>
/// This DTFx orchestration middleware (for entities) allows us to add context and set state
/// to the entity shim orchestration before it starts executing the actual entity logic.
/// </summary>
/// <param name="dispatchContext">A property bag containing useful DTFx context.</param>
/// <param name="next">The handler for running the next middleware in the pipeline.</param>
private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, Func<Task> next)
{
var entityShim = dispatchContext.GetProperty<TaskOrchestration>() as TaskEntityShim;
if (entityShim == null)
{
// This is not an entity - skip.
await next();
return;
}
OrchestrationRuntimeState runtimeState = dispatchContext.GetProperty<OrchestrationRuntimeState>();
DurableEntityContext entityContext = (DurableEntityContext)entityShim.Context;
entityContext.InstanceId = runtimeState.OrchestrationInstance.InstanceId;
entityContext.ExecutionId = runtimeState.OrchestrationInstance.ExecutionId;
entityContext.History = runtimeState.Events;
entityContext.RawInput = runtimeState.Input;
Queue<RequestMessage> lockHolderMessages = null;
try
{
// 1. First time through the history
// we count events, add any under-lock op to the batch, and process lock releases
foreach (HistoryEvent e in runtimeState.Events)
{
switch (e.EventType)
{
case EventType.ExecutionStarted:
entityShim.Rehydrate(runtimeState.Input);
break;
case EventType.EventRaised:
EventRaisedEvent eventRaisedEvent = (EventRaisedEvent)e;
this.TraceHelper.DeliveringEntityMessage(
entityContext.InstanceId,
entityContext.ExecutionId,
e.EventId,
eventRaisedEvent.Name,
eventRaisedEvent.Input);
entityShim.NumberEventsToReceive++;
if (EntityMessageEventNames.IsRequestMessage(eventRaisedEvent.Name))
{
// we are receiving an operation request or a lock request
var requestMessage = this.MessageDataConverter.Deserialize<RequestMessage>(eventRaisedEvent.Input);
IEnumerable<RequestMessage> deliverNow;
if (requestMessage.ScheduledTime.HasValue)
{
if ((requestMessage.ScheduledTime.Value - DateTime.UtcNow) > TimeSpan.FromMilliseconds(100))
{
// message was delivered too early. This can happen if the durability provider imposes
// a limit on the delay. We handle this by rescheduling the message instead of processing it.
deliverNow = Array.Empty<RequestMessage>();
entityShim.AddMessageToBeRescheduled(requestMessage);
}
else
{
// the message is scheduled to be delivered immediately.
// There are no FIFO guarantees for scheduled messages, so we skip the message sorter.
deliverNow = new RequestMessage[] { requestMessage };
}
}
else
{
// run this through the message sorter to help with reordering and duplicate filtering
deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow);
}
foreach (var message in deliverNow)
{
if (entityContext.State.LockedBy == message.ParentInstanceId)
{
if (lockHolderMessages == null)
{
lockHolderMessages = new Queue<RequestMessage>();
}
lockHolderMessages.Enqueue(message);
}
else
{
entityContext.State.Enqueue(message);
}
}
}
else if (EntityMessageEventNames.IsReleaseMessage(eventRaisedEvent.Name))
{
// we are receiving a lock release
var message = this.MessageDataConverter.Deserialize<ReleaseMessage>(eventRaisedEvent.Input);
if (entityContext.State.LockedBy == message.ParentInstanceId)
{
this.TraceHelper.EntityLockReleased(
entityContext.HubName,
entityContext.Name,
entityContext.InstanceId,
message.ParentInstanceId,
message.LockRequestId,
isReplay: false);
entityContext.State.LockedBy = null;
}
}
else
{
// this is a continue message.
// Resumes processing of previously queued operations, if any.
entityContext.State.Suspended = false;
}
break;
}
}
// lock holder messages go to the front of the queue
if (lockHolderMessages != null)
{
entityContext.State.PutBack(lockHolderMessages);
}
if (!entityContext.State.Suspended)
{
// 2. We add as many requests from the queue to the batch as possible,
// stopping at lock requests or when the maximum batch size is reached
while (entityContext.State.MayDequeue())
{
if (entityShim.OperationBatch.Count == this.Options.MaxEntityOperationBatchSize)
{
// we have reached the maximum batch size already
// insert a delay after this batch to ensure write back
entityShim.ToBeContinuedWithDelay();
break;
}
var request = entityContext.State.Dequeue();
if (request.IsLockRequest)
{
entityShim.AddLockRequestToBatch(request);
break;
}
else
{
entityShim.AddOperationToBatch(request);
}
}