-
Notifications
You must be signed in to change notification settings - Fork 2k
/
SQSAdapterFactory.cs
100 lines (90 loc) · 4.06 KB
/
SQSAdapterFactory.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Providers.Streams.Common;
using Orleans.Streams;
using Orleans.Configuration;
using Orleans;
using Orleans.Configuration.Overrides;
using Orleans.Serialization;
namespace OrleansAWSUtils.Streams
{
/// <summary> Factory class for Azure Queue based stream provider.</summary>
public class SQSAdapterFactory : IQueueAdapterFactory
{
private readonly string providerName;
private readonly SqsOptions sqsOptions;
private readonly ClusterOptions clusterOptions;
private readonly Serializer<SQSBatchContainer> serializer;
private readonly ILoggerFactory loggerFactory;
private readonly HashRingBasedStreamQueueMapper streamQueueMapper;
private readonly IQueueAdapterCache adapterCache;
/// <summary>
/// Application level failure handler override.
/// </summary>
protected Func<QueueId, Task<IStreamFailureHandler>> StreamFailureHandlerFactory { private get; set; }
public SQSAdapterFactory(
string name,
SqsOptions sqsOptions,
HashRingStreamQueueMapperOptions queueMapperOptions,
SimpleQueueCacheOptions cacheOptions,
IOptions<ClusterOptions> clusterOptions,
Orleans.Serialization.Serializer serializer,
ILoggerFactory loggerFactory)
{
this.providerName = name;
this.sqsOptions = sqsOptions;
this.clusterOptions = clusterOptions.Value;
this.serializer = serializer.GetSerializer<SQSBatchContainer>();
this.loggerFactory = loggerFactory;
streamQueueMapper = new HashRingBasedStreamQueueMapper(queueMapperOptions, this.providerName);
adapterCache = new SimpleQueueAdapterCache(cacheOptions, this.providerName, this.loggerFactory);
}
/// <summary> Init the factory.</summary>
public virtual void Init()
{
if (StreamFailureHandlerFactory == null)
{
StreamFailureHandlerFactory =
qid => Task.FromResult<IStreamFailureHandler>(new NoOpStreamDeliveryFailureHandler());
}
}
/// <summary>Creates the Azure Queue based adapter.</summary>
public virtual Task<IQueueAdapter> CreateAdapter()
{
var adapter = new SQSAdapter(this.serializer, this.streamQueueMapper, this.loggerFactory, this.sqsOptions.ConnectionString, this.clusterOptions.ServiceId, this.providerName);
return Task.FromResult<IQueueAdapter>(adapter);
}
/// <summary>Creates the adapter cache.</summary>
public virtual IQueueAdapterCache GetQueueAdapterCache()
{
return adapterCache;
}
/// <summary>Creates the factory stream queue mapper.</summary>
public IStreamQueueMapper GetStreamQueueMapper()
{
return streamQueueMapper;
}
/// <summary>
/// Creates a delivery failure handler for the specified queue.
/// </summary>
/// <param name="queueId"></param>
/// <returns></returns>
public Task<IStreamFailureHandler> GetDeliveryFailureHandler(QueueId queueId)
{
return StreamFailureHandlerFactory(queueId);
}
public static SQSAdapterFactory Create(IServiceProvider services, string name)
{
var sqsOptions = services.GetOptionsByName<SqsOptions>(name);
var cacheOptions = services.GetOptionsByName<SimpleQueueCacheOptions>(name);
var queueMapperOptions = services.GetOptionsByName<HashRingStreamQueueMapperOptions>(name);
IOptions<ClusterOptions> clusterOptions = services.GetProviderClusterOptions(name);
var factory = ActivatorUtilities.CreateInstance<SQSAdapterFactory>(services, name, sqsOptions, cacheOptions, queueMapperOptions, clusterOptions);
factory.Init();
return factory;
}
}
}