This repository has been archived by the owner on Jul 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 19
/
DistributedCacheStoreLocator.cs
155 lines (130 loc) · 6.34 KB
/
DistributedCacheStoreLocator.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
using Microsoft.Extensions.Options;
using Microsoft.ServiceFabric.Services.Client;
using Microsoft.ServiceFabric.Services.Communication.Client;
using Microsoft.ServiceFabric.Services.Remoting.Client;
using Microsoft.ServiceFabric.Services.Remoting.V2;
using Microsoft.ServiceFabric.Services.Remoting.V2.FabricTransport.Client;
using Microsoft.VisualStudio.Threading;
using System;
using System.Collections.Concurrent;
using System.Fabric;
using System.Fabric.Description;
using System.Fabric.Query;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
namespace SoCreate.Extensions.Caching.ServiceFabric
{
class DistributedCacheStoreLocator : IDistributedCacheStoreLocator
{
private const string CacheStoreProperty = "CacheStore";
private const string CacheStorePropertyValue = "true";
private const string ListenerName = "CacheStoreServiceListener";
private AsyncLazy<Uri> _serviceUri;
private readonly string _endpointName;
private readonly TimeSpan? _retryTimeout;
private readonly FabricClient _fabricClient;
private AsyncLazy<ServicePartitionList> _partitionList;
private readonly ConcurrentDictionary<Guid, IServiceFabricCacheStoreService> _cacheStores;
private readonly ServiceFabricCacheOptions _options;
private readonly IServiceRemotingMessageSerializationProvider _serializationProvider;
public DistributedCacheStoreLocator(IOptions<ServiceFabricCacheOptions> options)
{
_options = options.Value;
_endpointName = _options.CacheStoreEndpointName ?? ListenerName;
_retryTimeout = _options.RetryTimeout;
_serializationProvider = _options.SerializationProvider;
_fabricClient = new FabricClient();
_cacheStores = new ConcurrentDictionary<Guid, IServiceFabricCacheStoreService>();
_serviceUri = new AsyncLazy<Uri>(LocateCacheStoreAsync);
_partitionList = new AsyncLazy<ServicePartitionList>(GetPartitionListAsync);
}
public async Task<IServiceFabricCacheStoreService> GetCacheStoreProxy(string cacheKey)
{
var partitionInformation = await GetPartitionInformationForCacheKeyAsync(cacheKey);
var serviceUri = await _serviceUri.GetValueAsync();
return _cacheStores.GetOrAdd(partitionInformation.Id, key =>
{
var info = (Int64RangePartitionInformation)partitionInformation;
var resolvedPartition = new ServicePartitionKey(info.LowKey);
var retrySettings = _retryTimeout.HasValue ? new OperationRetrySettings(_retryTimeout.Value) : null;
var proxyFactory = new ServiceProxyFactory((c) =>
{
return new FabricTransportServiceRemotingClientFactory(serializationProvider: _serializationProvider);
}, retrySettings);
return proxyFactory.CreateServiceProxy<IServiceFabricCacheStoreService>(serviceUri, resolvedPartition, TargetReplicaSelector.Default, _endpointName);
});
}
private async Task<ServicePartitionInformation> GetPartitionInformationForCacheKeyAsync(string cacheKey)
{
var md5 = MD5.Create();
var value = md5.ComputeHash(Encoding.ASCII.GetBytes(cacheKey));
var key = BitConverter.ToInt64(value, 0);
var partition = (await _partitionList.GetValueAsync()).Single(p => ((Int64RangePartitionInformation)p.PartitionInformation).LowKey <= key && ((Int64RangePartitionInformation)p.PartitionInformation).HighKey >= key);
return partition.PartitionInformation;
}
private async Task<ServicePartitionList> GetPartitionListAsync()
{
return await _fabricClient.QueryManager.GetPartitionListAsync(await _serviceUri.GetValueAsync());
}
private async Task<Uri> LocateCacheStoreAsync()
{
if (_options.CacheStoreServiceUri != null)
{
return _options.CacheStoreServiceUri;
}
try
{
bool hasPages = true;
var query = new ApplicationQueryDescription() { MaxResults = 50 };
while (hasPages)
{
var apps = await _fabricClient.QueryManager.GetApplicationPagedListAsync(query);
query.ContinuationToken = apps.ContinuationToken;
hasPages = !string.IsNullOrEmpty(query.ContinuationToken);
foreach (var app in apps)
{
var serviceName = await LocateCacheStoreServiceInApplicationAsync(app.ApplicationName);
if (serviceName != null)
return serviceName;
}
}
}
catch { }
throw new CacheStoreNotFoundException("Cache store not found in Service Fabric cluster. Try setting the 'CacheStoreServiceUri' configuration option to the location of your cache store."); ;
}
private async Task<Uri> LocateCacheStoreServiceInApplicationAsync(Uri applicationName)
{
try
{
bool hasPages = true;
var query = new ServiceQueryDescription(applicationName) { MaxResults = 50 };
while (hasPages)
{
var services = await _fabricClient.QueryManager.GetServicePagedListAsync(query);
query.ContinuationToken = services.ContinuationToken;
hasPages = !string.IsNullOrEmpty(query.ContinuationToken);
foreach (var service in services)
{
var found = await IsCacheStore(service.ServiceName);
if (found)
return service.ServiceName;
}
}
}
catch { }
return null;
}
private async Task<bool> IsCacheStore(Uri serviceName)
{
try
{
var isCacheStore = await _fabricClient.PropertyManager.GetPropertyAsync(serviceName, CacheStoreProperty);
return isCacheStore.GetValue<string>() == CacheStorePropertyValue;
}
catch { }
return false;
}
}
}