-
Notifications
You must be signed in to change notification settings - Fork 2k
/
AdoNetClusteringTable.cs
213 lines (189 loc) · 8.93 KB
/
AdoNetClusteringTable.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Clustering.AdoNet.Storage;
using Orleans.Configuration;
namespace Orleans.Runtime.MembershipService
{
public class AdoNetClusteringTable: IMembershipTable
{
private string clusterId;
private readonly IServiceProvider serviceProvider;
private ILogger logger;
private RelationalOrleansQueries orleansQueries;
private readonly AdoNetClusteringSiloOptions clusteringTableOptions;
public AdoNetClusteringTable(
IServiceProvider serviceProvider,
IOptions<ClusterOptions> clusterOptions,
IOptions<AdoNetClusteringSiloOptions> clusteringOptions,
ILogger<AdoNetClusteringTable> logger)
{
this.serviceProvider = serviceProvider;
this.logger = logger;
this.clusteringTableOptions = clusteringOptions.Value;
this.clusterId = clusterOptions.Value.ClusterId;
}
public async Task InitializeMembershipTable(bool tryInitTableVersion)
{
if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace("AdoNetClusteringTable.InitializeMembershipTable called.");
//This initializes all of Orleans operational queries from the database using a well known view
//and assumes the database with appropriate definitions exists already.
var grainReferenceConverter = this.serviceProvider.GetRequiredService<GrainReferenceKeyStringConverter>();
orleansQueries = await RelationalOrleansQueries.CreateInstance(
clusteringTableOptions.Invariant,
clusteringTableOptions.ConnectionString,
grainReferenceConverter);
// even if I am not the one who created the table,
// try to insert an initial table version if it is not already there,
// so we always have a first table version row, before this silo starts working.
if(tryInitTableVersion)
{
var wasCreated = await InitTableAsync();
if(wasCreated)
{
logger.LogInformation("Created new table version row.");
}
}
}
public async Task<MembershipTableData> ReadRow(SiloAddress key)
{
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("AdoNetClusteringTable.ReadRow called with key: {Key}.", key);
try
{
return await orleansQueries.MembershipReadRowAsync(this.clusterId, key);
}
catch(Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug(ex, "AdoNetClusteringTable.ReadRow failed");
throw;
}
}
public async Task<MembershipTableData> ReadAll()
{
if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace("AdoNetClusteringTable.ReadAll called.");
try
{
return await orleansQueries.MembershipReadAllAsync(this.clusterId);
}
catch(Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug(ex, "AdoNetClusteringTable.ReadAll failed");
throw;
}
}
public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersion)
{
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace(
"AdoNetClusteringTable.InsertRow called with entry {Entry} and tableVersion {TableVersion}.",
entry,
tableVersion);
//The "tableVersion" parameter should always exist when inserting a row as Init should
//have been called and membership version created and read. This is an optimization to
//not to go through all the way to database to fail a conditional check on etag (which does
//exist for the sake of robustness) as mandated by Orleans membership protocol.
//Likewise, no update can be done without membership entry.
if (entry == null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.InsertRow aborted due to null check. MembershipEntry is null.");
throw new ArgumentNullException("entry");
}
if (tableVersion == null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.InsertRow aborted due to null check. TableVersion is null ");
throw new ArgumentNullException("tableVersion");
}
try
{
return await orleansQueries.InsertMembershipRowAsync(this.clusterId, entry, tableVersion.VersionEtag);
}
catch(Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug(ex, "AdoNetClusteringTable.InsertRow failed");
throw;
}
}
public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion)
{
if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace("IMembershipTable.UpdateRow called with entry {Entry}, etag {ETag} and tableVersion {TableVersion}.", entry, etag, tableVersion);
//The "tableVersion" parameter should always exist when updating a row as Init should
//have been called and membership version created and read. This is an optimization to
//not to go through all the way to database to fail a conditional check (which does
//exist for the sake of robustness) as mandated by Orleans membership protocol.
//Likewise, no update can be done without membership entry or an etag.
if (entry == null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.UpdateRow aborted due to null check. MembershipEntry is null.");
throw new ArgumentNullException("entry");
}
if (tableVersion == null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.UpdateRow aborted due to null check. TableVersion is null");
throw new ArgumentNullException("tableVersion");
}
try
{
return await orleansQueries.UpdateMembershipRowAsync(this.clusterId, entry, tableVersion.VersionEtag);
}
catch(Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug(ex, "AdoNetClusteringTable.UpdateRow failed");
throw;
}
}
public async Task UpdateIAmAlive(MembershipEntry entry)
{
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("IMembershipTable.UpdateIAmAlive called with entry {Entry}.", entry);
if (entry == null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.UpdateIAmAlive aborted due to null check. MembershipEntry is null.");
throw new ArgumentNullException("entry");
}
try
{
await orleansQueries.UpdateIAmAliveTimeAsync(this.clusterId, entry.SiloAddress, entry.IAmAliveTime);
}
catch(Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug(ex, "AdoNetClusteringTable.UpdateIAmAlive failed");
throw;
}
}
public async Task DeleteMembershipTableEntries(string clusterId)
{
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("IMembershipTable.DeleteMembershipTableEntries called with clusterId {ClusterId}.", clusterId);
try
{
await orleansQueries.DeleteMembershipTableEntriesAsync(clusterId);
}
catch(Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug(ex, "AdoNetClusteringTable.DeleteMembershipTableEntries failed");
throw;
}
}
private async Task<bool> InitTableAsync()
{
try
{
return await orleansQueries.InsertMembershipVersionRowAsync(this.clusterId);
}
catch(Exception ex)
{
if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace(ex, "Insert silo membership version failed");
throw;
}
}
public Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
{
throw new NotImplementedException();
}
}
}