/
ConsulServerRegistryConnector.cs
277 lines (234 loc) · 10.9 KB
/
ConsulServerRegistryConnector.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
using Consul;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace DarkRift.Server.Plugins.ServerRegistryConnectors.Consul
{
/// <summary>
/// DarkRift ServerRegistryConnector plugin for Consul.
/// </sumamry>
public class ConsulServerRegistryConnector : ServerRegistryConnector
{
public override bool ThreadSafe => true;
public override Version Version => new Version(1, 0, 0);
/// <summary>
/// The URL to set as the Consul health check for the server.
/// </summary>
private readonly string healthCheckUrl = "http://localhost:10666/health";
/// <summary>
/// The poll interval of the Consul health check for the server.
/// </summary>
private readonly TimeSpan healthCheckPollInterval = TimeSpan.FromMilliseconds(5000);
/// <summary>
/// The maximum time the Consul health check for the server can be failing for before the server is deregistered.
/// </summary>
/// <remarks>
/// Minimuim 1m, granularity ~30 seconds.
/// </remarks>
private readonly TimeSpan healthCheckTimeout = TimeSpan.FromSeconds(60);
/// <summary>
/// The service name to register as in Consul.
/// </summary>
private readonly string serviceName = "darkrift";
/// <summary>
/// The client to connect to Consul via.
/// </summary>
private readonly ConsulClient client;
public ConsulServerRegistryConnector(ServerRegistryConnectorLoadData pluginLoadData) : base(pluginLoadData)
{
client = new ConsulClient(configuration =>
{
if (pluginLoadData.Settings["consulAddress"] != null)
configuration.Address = new Uri(pluginLoadData.Settings["consulAddress"]);
if (pluginLoadData.Settings["consulDatacenter"] != null)
configuration.Datacenter = pluginLoadData.Settings["consulDatacenter"];
if (pluginLoadData.Settings["consulToken"] != null)
configuration.Token = pluginLoadData.Settings["consulToken"];
});
if (pluginLoadData.Settings["healthCheckUrl"] != null)
healthCheckUrl = pluginLoadData.Settings["healthCheckUrl"];
if (pluginLoadData.Settings["healthCheckPollIntervalMs"] != null)
healthCheckPollInterval = TimeSpan.FromMilliseconds(int.Parse(pluginLoadData.Settings["healthCheckPollIntervalMs"]));
if (pluginLoadData.Settings["healthCheckTimeoutMs"] != null)
{
healthCheckTimeout = TimeSpan.FromMilliseconds(int.Parse(pluginLoadData.Settings["healthCheckTimeoutMs"]));
if (healthCheckTimeout < TimeSpan.FromMinutes(1))
throw new InvalidOperationException("healthCheckTimeout property cannot be less than 1 minute.");
}
if (pluginLoadData.Settings["serviceName"] != null)
serviceName = pluginLoadData.Settings["serviceName"];
}
/// <summary>
/// Pulls the current services from Consul and updates the server with any changes to that list.
/// </summary>
/// <returns>A task object for the operation.</returns>
protected async Task FetchServices()
{
Logger.Trace($"Refreshing services (I'm server {RemoteServerManager.ServerID}).");
// Query Consul for the current list of services
// TODO the library we use doesn't seem to allow us to get only services with a passing health check
QueryResult<CatalogService[]> result;
try
{
result = await client.Catalog.Service(serviceName);
}
catch (Exception e)
{
Logger.Error("Failed to fetch services from Consul as an exception occurred.", e);
return;
}
CatalogService[] services = result.Response;
// Map to ushort IDs
Dictionary<ushort, CatalogService> parsedServices = services.ToDictionary(s => ushort.Parse(s.ServiceID), s => s);
// Get all known sevices
// TODO if a server isn't in a group we're connected with then they're not retured here which causes us to perpetually discover them
IEnumerable<ushort> knownServices = RemoteServerManager.GetAllGroups().SelectMany(g => g.GetAllRemoteServers()).Select(s => s.ID);
// Diff the current services aginst the known services
IEnumerable<ushort> joined, left;
joined = parsedServices.Keys.Where(k => !knownServices.Contains(k));
left = knownServices.Where(k => !parsedServices.ContainsKey(k));
foreach (ushort joinedID in joined)
{
CatalogService service = parsedServices[joinedID];
string group = service.ServiceTags.First().Substring(6);
HandleServerJoin(joinedID, group, service.Address, (ushort)service.ServicePort, service.ServiceMeta);
}
//TODO consider just a set method instead of/as well as join/leave
foreach (ushort leftID in left)
HandleServerLeave(leftID);
}
protected override void DeregisterServer()
{
DeregisterServerAsync().Wait();
}
/// <summary>
/// Deregisters the server with Consul.
/// </summary>
/// <returns>A task object for the operation.</returns>
private async Task DeregisterServerAsync()
{
try
{
await client.Agent.ServiceDeregister(RemoteServerManager.ServerID.ToString());
}
catch (Exception e)
{
Logger.Error("Failed to deregister server from Consul as an exception occurred.", e);
return;
}
}
//TODO in future, when supported by the core DarkRift libraries, this should proably be an async method
protected override ushort RegisterServer(string group, string host, ushort port, IDictionary<string, string> properties)
{
return RegisterServerAsync(group, host, port, properties).GetAwaiter().GetResult();
}
/// <summary>
/// Registers the server with Consul.
/// </summary>
/// <param name="group">The group the server is in.</param>
/// <param name="host">The advertised host property of the server.</param>
/// <param name="port">The advertised port property of the server.</param>
/// <param name="properties">Additional properties supplied by the server.</param>
/// <returns>A task object for the operation.</returns>
private async Task<ushort> RegisterServerAsync(string group, string host, int port, IDictionary<string, string> properties)
{
ushort id = await AllocateID();
Logger.Trace("Registering server on Consul...");
// TODO add configuration
AgentServiceCheck healthCheck = new AgentServiceCheck()
{
HTTP = healthCheckUrl,
Interval = healthCheckPollInterval,
DeregisterCriticalServiceAfter = healthCheckTimeout
};
AgentServiceRegistration service = new AgentServiceRegistration
{
ID = id.ToString(),
Name = serviceName,
Address = host,
Port = port,
Tags = new string[] { "group:" + group },
Meta = properties,
Check = healthCheck
};
try
{
await client.Agent.ServiceRegister(service);
}
catch (Exception e)
{
Logger.Error("Failed to register server with Consul as an exception occurred.", e);
throw e;
}
// Start timers and get an initial list
FetchServices().Wait();
CreateTimer(10000, 10000, (_) => FetchServices().Wait());
return id;
}
/// <summary>
/// Allocates a new ID from Consul.
/// </summary>
/// <returns>A task object for the operation with the ID allocated.</returns>
private async Task<ushort> AllocateID()
{
for (int attempt = 0; attempt < 10; attempt++)
{
Logger.Trace("Allocating a new ID on Consul, attempt: " + (attempt + 1));
QueryResult<KVPair> result = await client.KV.Get("darkrift-2/next-id");
KVPair kvPair = result.Response;
if (kvPair != null)
{
bool valid = ushort.TryParse(Encoding.UTF8.GetString(kvPair.Value, 0, kvPair.Value.Length), out ushort id);
if (!valid)
throw new InvalidOperationException("Failed to allocate ID as the stored next ID is not a valid ushort.");
kvPair.Value = Encoding.UTF8.GetBytes((id + 1).ToString());
WriteResult<bool> casResult;
try
{
casResult = await client.KV.CAS(kvPair);
}
catch (Exception e)
{
Logger.Error("Failed to perform CAS operation on Consul while updating ID field.", e);
throw e;
}
if (casResult.Response)
return id;
}
else
{
// First in the cluster, we need to create the next-id field!
kvPair = new KVPair("darkrift-2/next-id")
{
Value = Encoding.UTF8.GetBytes("1")
};
WriteResult<bool> casResult;
try
{
casResult = await client.KV.CAS(kvPair);
}
catch (Exception e)
{
Logger.Error("Failed to perform CAS operation on Consul while creating ID field.", e);
throw e;
}
if (casResult.Response)
return 0;
}
}
Logger.Error("Failed to allocate ID from Consul as the operation exceeded the maximum number of allowed attempts (10).");
throw new InvalidOperationException("Failed to allocate ID from Consul as the operation exceeded the maximum number of allowed attempts (10).");
}
/// <summary>
/// Disposes of the client.
/// </summary>
/// <param name="disposing">If we are disopsing.</param>
protected override void Dispose(bool disposing)
{
if (disposing)
client.Dispose();
}
}
}