-
Notifications
You must be signed in to change notification settings - Fork 0
/
TerminateAndCleanup.cs
173 lines (145 loc) · 6.85 KB
/
TerminateAndCleanup.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs;
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using DurableTask.Core;
using Microsoft.Extensions.Logging;
using Crony.Models;
using Crony;
using Azure.Storage.Blobs;
using Newtonsoft.Json;
using Azure.Storage.Blobs.Models;
using System.ComponentModel;
using System.Reflection.Metadata;
namespace Durable.Crony.Microservice
{
public static class TerminateAndCleanup
{
[Deterministic]
[FunctionName("OrchestrateCompletionWebook")]
public static async Task OrchestrateCompletionWebook([OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger logger)
{
string name = context.GetInput<string>();
Microsoft.Azure.WebJobs.Extensions.DurableTask.RetryOptions ro = new(TimeSpan.FromSeconds(10), 5)
{
BackoffCoefficient = 1.2
};
Webhook webhook = await context.CallActivityWithRetryAsync<Webhook>(nameof(GetWebhook), ro, name);
if(webhook == null)
{
throw new Exception("Webhook retrieval error");
}
DurableHttpRequest durquest = new(webhook.HttpMethod,
new Uri(webhook.Url),
content: webhook.Content,
httpRetryOptions: new HttpRetryOptions(TimeSpan.FromSeconds(webhook.RetryOptions.Interval), webhook.RetryOptions.MaxNumberOfAttempts)
{
BackoffCoefficient = webhook.RetryOptions.BackoffCoefficient,
MaxRetryInterval = TimeSpan.FromSeconds(webhook.RetryOptions.MaxRetryInterval),
StatusCodesToRetry = webhook.GetRetryEnabledStatusCodes()
},
asynchronousPatternEnabled: webhook.PollIf202,
timeout: TimeSpan.FromSeconds(webhook.Timeout));
foreach (var h in webhook.Headers)
{
durquest.Headers.Add(h.Key, h.Value);
}
try
{
await context.CallHttpAsync(durquest);
}
catch (Exception ex)
{
context.SetOutput($"Webhook call error: {ex.Message}");
}
await context.CallActivityWithRetryAsync<Webhook>(nameof(DeleteWebhook), ro, name);
}
public static async Task CompleteTimer(IDurableOrchestrationContext context)
{
await context.CallSubOrchestratorAsync("OrchestrateCompletionWebook", $"Completion_{context.InstanceId}", context.InstanceId);
}
[FunctionName(nameof(GetWebhook))]
public static async Task<Webhook> GetWebhook([ActivityTrigger] string timerName)
{
BlobServiceClient service = new(Environment.GetEnvironmentVariable("AzureWebJobsStorage"));
BlobContainerClient container = service.GetBlobContainerClient("crony-webhooks");
BlobClient blobClient = container.GetBlobClient(timerName);
BlobDownloadResult downloadResult;
try
{
downloadResult = await blobClient.DownloadContentAsync();
}
catch
{
return null;
}
return JsonConvert.DeserializeObject<Webhook>(downloadResult.Content.ToString());
}
[FunctionName(nameof(DeleteWebhook))]
public static async Task DeleteWebhook([ActivityTrigger] string timerName)
{
BlobServiceClient service = new(Environment.GetEnvironmentVariable("AzureWebJobsStorage"));
BlobContainerClient container = service.GetBlobContainerClient("crony-webhooks");
BlobClient blobClient = container.GetBlobClient(timerName);
await blobClient.DeleteAsync();
}
[FunctionName(nameof(IsReady))]
public static async Task<bool?> IsReady([ActivityTrigger] string timerName,
[DurableClient] IDurableClient client)
{
DurableOrchestrationStatus status = await client.GetStatusAsync(timerName, showInput: false);
if (status == null)
{
return null;
}
return status.RuntimeStatus == OrchestrationRuntimeStatus.Terminated
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Completed
|| status.RuntimeStatus == OrchestrationRuntimeStatus.Failed;
}
[FunctionName(nameof(CancelTimer))]
public static async Task<HttpResponseMessage> CancelTimer([HttpTrigger(AuthorizationLevel.Anonymous, "delete", Route = "CancelTimer/{timerName}")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient client,
string timerName)
{
await client.TerminateAsync(timerName, null);
await DeleteWebhook(timerName);
return new HttpResponseMessage(System.Net.HttpStatusCode.OK);
}
/// <summary>
/// Cleanup timer trigger daily at 01:00
/// </summary>
[FunctionName(nameof(CleanupTimerTrigger))]
public static async Task CleanupTimerTrigger([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
[DurableClient] IDurableOrchestrationClient client,
ILogger logger)
{
try
{
//clear non-failed history
await client.PurgeInstanceHistoryAsync(DateTime.MinValue, DateTime.UtcNow.AddDays(-1),
new List<OrchestrationStatus>
{
OrchestrationStatus.Completed,
OrchestrationStatus.Terminated
});
//clear failed history
await client.PurgeInstanceHistoryAsync(
DateTime.MinValue, DateTime.UtcNow.AddDays(-7),
new List<OrchestrationStatus>
{
OrchestrationStatus.Failed,
OrchestrationStatus.Canceled
});
}
catch (Exception ex)
{
Exception x = ex.GetBaseException();
logger.LogError(null, x.Message + " - " + x.GetType().Name, null);
}
}
}
}