-
Notifications
You must be signed in to change notification settings - Fork 204
/
StorageScheduleMonitor.cs
147 lines (135 loc) · 6.05 KB
/
StorageScheduleMonitor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Microsoft.Azure.WebJobs.Extensions.Timers
{
/// <summary>
/// <see cref="ScheduleMonitor"/> that stores schedule information in blob storage.
/// </summary>
public class StorageScheduleMonitor : ScheduleMonitor
{
private const string HostContainerName = "azure-webjobs-hosts";
private readonly DistributedLockManagerContainerProvider _lockContainerProvider;
private readonly JsonSerializer _serializer;
private readonly ILogger _logger;
private readonly IHostIdProvider _hostIdProvider;
private readonly IConfiguration _configuration;
private CloudBlobDirectory _timerStatusDirectory;
/// <summary>
/// Constructs a new instance.
/// </summary>
/// <param name="lockContainerProvider"></param>
/// <param name="hostIdProvider"></param>
/// <param name="configuration"></param>
/// <param name="loggerFactory"></param>
public StorageScheduleMonitor(DistributedLockManagerContainerProvider lockContainerProvider, IHostIdProvider hostIdProvider,
IConfiguration configuration, ILoggerFactory loggerFactory)
{
_lockContainerProvider = lockContainerProvider ?? throw new ArgumentNullException(nameof(lockContainerProvider));
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
_hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider));
_logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Timer"));
JsonSerializerSettings settings = new JsonSerializerSettings
{
DateFormatHandling = DateFormatHandling.IsoDateFormat
};
_serializer = JsonSerializer.Create(settings);
}
/// <summary>
/// Gets the blob directory where timer statuses will be stored.
/// </summary>
internal CloudBlobDirectory TimerStatusDirectory
{
get
{
// We have to delay create the blob directory since we require the JobHost ID, and that will only
// be available AFTER the host as been started
if (_timerStatusDirectory == null)
{
string hostId = _hostIdProvider.GetHostIdAsync(CancellationToken.None).GetAwaiter().GetResult();
if (string.IsNullOrEmpty(hostId))
{
throw new InvalidOperationException("Unable to determine host ID.");
}
CloudBlobContainer container;
if (_lockContainerProvider.InternalContainer != null)
{
container = _lockContainerProvider.InternalContainer;
}
else
{
var connectionString = _configuration.GetWebJobsConnectionString(ConnectionStringNames.Storage);
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
container = blobClient.GetContainerReference(HostContainerName);
}
string timerStatusDirectoryPath = string.Format("timers/{0}", hostId);
_timerStatusDirectory = container.GetDirectoryReference(timerStatusDirectoryPath);
}
return _timerStatusDirectory;
}
}
/// <inheritdoc/>
public override async Task<ScheduleStatus> GetStatusAsync(string timerName)
{
CloudBlockBlob statusBlob = GetStatusBlobReference(timerName);
try
{
string statusLine = await statusBlob.DownloadTextAsync();
ScheduleStatus status;
using (StringReader stringReader = new StringReader(statusLine))
{
status = (ScheduleStatus)_serializer.Deserialize(stringReader, typeof(ScheduleStatus));
}
return status;
}
catch (StorageException exception)
{
if (exception.RequestInformation != null &&
exception.RequestInformation.HttpStatusCode == 404)
{
// we haven't recorded a status yet
return null;
}
throw;
}
}
/// <inheritdoc/>
public override async Task UpdateStatusAsync(string timerName, ScheduleStatus status)
{
string statusLine;
using (StringWriter stringWriter = new StringWriter())
{
_serializer.Serialize(stringWriter, status);
statusLine = stringWriter.ToString();
}
try
{
CloudBlockBlob statusBlob = GetStatusBlobReference(timerName);
await statusBlob.UploadTextAsync(statusLine);
}
catch (Exception ex)
{
// best effort
_logger.LogError(ex, $"Function '{timerName}' failed to update the timer trigger status.");
}
}
private CloudBlockBlob GetStatusBlobReference(string timerName)
{
// Path to the status blob is:
// timers/{hostId}/{timerName}/status
string blobName = string.Format("{0}/status", timerName);
return TimerStatusDirectory.GetBlockBlobReference(blobName);
}
}
}