-
Notifications
You must be signed in to change notification settings - Fork 270
/
DurableTaskListener.cs
102 lines (91 loc) · 3.29 KB
/
DurableTaskListener.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Monitoring;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Host.Scale;
#endif
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
#if !FUNCTIONS_V1
internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider
#else
internal sealed class DurableTaskListener : IListener
#endif
{
private readonly DurableTaskExtension config;
private readonly string functionId;
private readonly FunctionName functionName;
private readonly FunctionType functionType;
private readonly string connectionName;
#if !FUNCTIONS_V1
private readonly Lazy<IScaleMonitor> scaleMonitor;
#endif
public DurableTaskListener(
DurableTaskExtension config,
string functionId,
FunctionName functionName,
FunctionType functionType,
string connectionName)
{
this.config = config ?? throw new ArgumentNullException(nameof(config));
if (functionName == default(FunctionName))
{
throw new ArgumentNullException(nameof(functionName));
}
this.functionId = functionId;
this.functionName = functionName;
this.functionType = functionType;
this.connectionName = connectionName;
#if !FUNCTIONS_V1
this.scaleMonitor = new Lazy<IScaleMonitor>(() =>
this.config.GetScaleMonitor(
this.functionId,
this.functionName,
this.connectionName));
#endif
}
public Task StartAsync(CancellationToken cancellationToken)
{
return this.config.StartTaskHubWorkerIfNotStartedAsync();
}
public Task StopAsync(CancellationToken cancellationToken)
{
// The actual listener is a task hub worker, which is shared by all orchestration
// and activity function listeners in the function app. The task hub worker
// gets shut down only when all durable functions are shut down.
switch (this.functionType)
{
case FunctionType.Orchestrator:
this.config.DeregisterOrchestrator(this.functionName);
break;
case FunctionType.Entity:
this.config.DeregisterEntity(this.functionName);
break;
case FunctionType.Activity:
this.config.DeregisterActivity(this.functionName);
break;
}
return this.config.StopTaskHubWorkerIfIdleAsync();
}
public void Cancel()
{
this.StopAsync(CancellationToken.None).ConfigureAwait(false).GetAwaiter().GetResult();
}
public void Dispose()
{
}
#if !FUNCTIONS_V1
public IScaleMonitor GetMonitor()
{
return this.scaleMonitor.Value;
}
#endif
}
}