-
Notifications
You must be signed in to change notification settings - Fork 2k
/
AzureQueueStreamProviderUtils.cs
91 lines (84 loc) · 4.26 KB
/
AzureQueueStreamProviderUtils.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.AzureUtils;
using Orleans.Configuration;
using Orleans.Streams;
namespace Orleans.Providers.Streams.AzureQueue
{
/// <summary>
/// Utility functions for azure queue Persistent stream provider.
/// </summary>
public class AzureQueueStreamProviderUtils
{
/// <summary>
/// Generate default azure queue names
/// </summary>
/// <param name="serviceId"></param>
/// <param name="providerName"></param>
/// <returns></returns>
public static List<string> GenerateDefaultAzureQueueNames(string serviceId, string providerName)
{
var defaultQueueMapper = new HashRingBasedStreamQueueMapper(new HashRingStreamQueueMapperOptions(), providerName);
return defaultQueueMapper.GetAllQueues()
.Select(queueName => $"{serviceId}-{queueName}").ToList();
}
/// <summary>
/// Helper method for testing. Deletes all the queues used by the specified stream provider.
/// </summary>
/// <param name="loggerFactory">logger factory to use</param>
/// <param name="azureQueueNames">azure queue names to be deleted.</param>
/// <param name="storageConnectionString">The azure storage connection string.</param>
public static async Task DeleteAllUsedAzureQueues(ILoggerFactory loggerFactory, List<string> azureQueueNames, string storageConnectionString)
{
var options = new AzureQueueOptions();
options.QueueServiceClient = new(storageConnectionString);
await DeleteAllUsedAzureQueues(loggerFactory, azureQueueNames, options);
}
/// <summary>
/// Helper method for testing. Deletes all the queues used by the specified stream provider.
/// </summary>
/// <param name="loggerFactory">logger factory to use</param>
/// <param name="azureQueueNames">azure queue names to be deleted.</param>
/// <param name="queueOptions">The azure storage options.</param>
public static async Task DeleteAllUsedAzureQueues(ILoggerFactory loggerFactory, List<string> azureQueueNames, AzureQueueOptions queueOptions)
{
var deleteTasks = new List<Task>();
foreach (var queueName in azureQueueNames)
{
var manager = new AzureQueueDataManager(loggerFactory, queueName, queueOptions);
deleteTasks.Add(manager.DeleteQueue());
}
await Task.WhenAll(deleteTasks);
}
/// <summary>
/// Helper method for testing. Clears all messages in all the queues used by the specified stream provider.
/// </summary>
/// <param name="loggerFactory">logger factory to use</param>
/// <param name="azureQueueNames">The deployment ID hosting the stream provider.</param>
/// <param name="storageConnectionString">The azure storage connection string.</param>
public static async Task ClearAllUsedAzureQueues(ILoggerFactory loggerFactory, List<string> azureQueueNames, string storageConnectionString)
{
var options = new AzureQueueOptions();
options.QueueServiceClient = new(storageConnectionString);
await ClearAllUsedAzureQueues(loggerFactory, azureQueueNames, options);
}
/// <summary>
/// Helper method for testing. Clears all messages in all the queues used by the specified stream provider.
/// </summary>
/// <param name="loggerFactory">logger factory to use</param>
/// <param name="azureQueueNames">The deployment ID hosting the stream provider.</param>
/// <param name="queueOptions">The azure storage options.</param>
public static async Task ClearAllUsedAzureQueues(ILoggerFactory loggerFactory, List<string> azureQueueNames, AzureQueueOptions queueOptions)
{
var deleteTasks = new List<Task>();
foreach (var queueName in azureQueueNames)
{
var manager = new AzureQueueDataManager(loggerFactory, queueName, queueOptions);
deleteTasks.Add(manager.ClearQueue());
}
await Task.WhenAll(deleteTasks);
}
}
}